14 from multiprocessing import Process, Pipe, cpu_count
15 from multiprocessing.queues import Queue
16 from multiprocessing.managers import BaseManager
17 from framework import VppTestRunner, running_extended_tests, VppTestCase, \
18 get_testcase_doc_name, get_test_description, PASS, FAIL, ERROR, SKIP, \
20 from debug import spawn_gdb
21 from log import get_parallel_logger, double_line_delim, RED, YELLOW, GREEN, \
22 colorize, single_line_delim
23 from discover_tests import discover_tests
24 from subprocess import check_output, CalledProcessError
25 from util import check_core_path, get_core_path, is_core_present
27 # timeout which controls how long the child has to finish after seeing
28 # a core dump in test temporary directory. If this is exceeded, parent assumes
29 # that child process is stuck (e.g. waiting for shm mutex, which will never
30 # get unlocked) and kill the child
32 min_req_shm = 536870912 # min 512MB shm required
33 # 128MB per extra process
34 shm_per_process = 134217728
37 class StreamQueue(Queue):
42 sys.__stdout__.flush()
43 sys.__stderr__.flush()
46 return self._writer.fileno()
49 class StreamQueueManager(BaseManager):
53 StreamQueueManager.register('StreamQueue', StreamQueue)
56 class TestResult(dict):
57 def __init__(self, testcase_suite, testcases_by_id=None):
58 super(TestResult, self).__init__()
65 self.testcase_suite = testcase_suite
66 self.testcases = [testcase for testcase in testcase_suite]
67 self.testcases_by_id = testcases_by_id
69 def was_successful(self):
70 return 0 == len(self[FAIL]) == len(self[ERROR]) \
71 and len(self[PASS] + self[SKIP]) \
72 == self.testcase_suite.countTestCases() == len(self[TEST_RUN])
74 def no_tests_run(self):
75 return 0 == len(self[TEST_RUN])
77 def process_result(self, test_id, result):
78 self[result].append(test_id)
80 def suite_from_failed(self):
82 for testcase in self.testcase_suite:
84 if tc_id not in self[PASS] and tc_id not in self[SKIP]:
86 if len(rerun_ids) > 0:
87 return suite_from_failed(self.testcase_suite, rerun_ids)
89 def get_testcase_names(self, test_id):
90 # could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
91 setup_teardown_match = re.match(
92 r'((tearDownClass)|(setUpClass)) \((.+\..+)\)', test_id)
93 if setup_teardown_match:
94 test_name, _, _, testcase_name = setup_teardown_match.groups()
95 if len(testcase_name.split('.')) == 2:
96 for key in self.testcases_by_id.keys():
97 if key.startswith(testcase_name):
100 testcase_name = self._get_testcase_doc_name(testcase_name)
102 test_name = self._get_test_description(test_id)
103 testcase_name = self._get_testcase_doc_name(test_id)
105 return testcase_name, test_name
107 def _get_test_description(self, test_id):
108 if test_id in self.testcases_by_id:
109 desc = get_test_description(descriptions,
110 self.testcases_by_id[test_id])
115 def _get_testcase_doc_name(self, test_id):
116 if test_id in self.testcases_by_id:
117 doc_name = get_testcase_doc_name(self.testcases_by_id[test_id])
123 def test_runner_wrapper(suite, keep_alive_pipe, stdouterr_queue,
124 finished_pipe, result_pipe, logger):
125 sys.stdout = stdouterr_queue
126 sys.stderr = stdouterr_queue
127 VppTestCase.parallel_handler = logger.handlers[0]
128 result = VppTestRunner(keep_alive_pipe=keep_alive_pipe,
129 descriptions=descriptions,
131 result_pipe=result_pipe,
133 print_summary=False).run(suite)
134 finished_pipe.send(result.wasSuccessful())
135 finished_pipe.close()
136 keep_alive_pipe.close()
139 class TestCaseWrapper(object):
140 def __init__(self, testcase_suite, manager):
141 self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(
143 self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
144 self.result_parent_end, self.result_child_end = Pipe(duplex=False)
145 self.testcase_suite = testcase_suite
146 if sys.version[0] == '2':
147 self.stdouterr_queue = manager.StreamQueue()
149 from multiprocessing import get_context
150 self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
151 self.logger = get_parallel_logger(self.stdouterr_queue)
152 self.child = Process(target=test_runner_wrapper,
153 args=(testcase_suite,
154 self.keep_alive_child_end,
155 self.stdouterr_queue,
156 self.finished_child_end,
157 self.result_child_end,
161 self.last_test_temp_dir = None
162 self.last_test_vpp_binary = None
163 self._last_test = None
164 self.last_test_id = None
166 self.last_heard = time.time()
167 self.core_detected_at = None
168 self.testcases_by_id = {}
169 self.testclasess_with_core = {}
170 for testcase in self.testcase_suite:
171 self.testcases_by_id[testcase.id()] = testcase
172 self.result = TestResult(testcase_suite, self.testcases_by_id)
176 return self._last_test
179 def last_test(self, test_id):
180 self.last_test_id = test_id
181 if test_id in self.testcases_by_id:
182 testcase = self.testcases_by_id[test_id]
183 self._last_test = testcase.shortDescription()
184 if not self._last_test:
185 self._last_test = str(testcase)
187 self._last_test = test_id
189 def add_testclass_with_core(self):
190 if self.last_test_id in self.testcases_by_id:
191 test = self.testcases_by_id[self.last_test_id]
192 class_name = unittest.util.strclass(test.__class__)
193 test_name = "'{}' ({})".format(get_test_description(descriptions,
197 test_name = self.last_test_id
198 class_name = re.match(r'((tearDownClass)|(setUpClass)) '
199 r'\((.+\..+)\)', test_name).groups()[3]
200 if class_name not in self.testclasess_with_core:
201 self.testclasess_with_core[class_name] = (
203 self.last_test_vpp_binary,
204 self.last_test_temp_dir)
206 def close_pipes(self):
207 self.keep_alive_child_end.close()
208 self.finished_child_end.close()
209 self.result_child_end.close()
210 self.keep_alive_parent_end.close()
211 self.finished_parent_end.close()
212 self.result_parent_end.close()
214 def was_successful(self):
215 return self.result.was_successful()
218 def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases,
221 while read_testcases.is_set() or len(unread_testcases):
222 if len(finished_unread_testcases):
223 read_testcase = finished_unread_testcases.pop()
224 unread_testcases.remove(read_testcase)
225 elif len(unread_testcases):
226 read_testcase = unread_testcases.pop()
229 while data is not None:
230 sys.stdout.write(data)
231 data = read_testcase.stdouterr_queue.get()
233 read_testcase.stdouterr_queue.close()
234 finished_unread_testcases.discard(read_testcase)
238 def handle_failed_suite(logger, last_test_temp_dir, vpp_pid):
239 if last_test_temp_dir:
240 # Need to create link in case of a timeout or core dump without failure
241 lttd = os.path.basename(last_test_temp_dir)
242 failed_dir = os.getenv('FAILED_DIR')
243 link_path = '%s%s-FAILED' % (failed_dir, lttd)
244 if not os.path.exists(link_path):
245 os.symlink(last_test_temp_dir, link_path)
246 logger.error("Symlink to failed testcase directory: %s -> %s"
249 # Report core existence
250 core_path = get_core_path(last_test_temp_dir)
251 if os.path.exists(core_path):
253 "Core-file exists in test temporary directory: %s!" %
255 check_core_path(logger, core_path)
256 logger.debug("Running `file %s':" % core_path)
258 info = check_output(["file", core_path])
260 except CalledProcessError as e:
261 logger.error("Could not run `file' utility on core-file, "
262 "rc=%s" % e.returncode)
265 # Copy api post mortem
266 api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
267 if os.path.isfile(api_post_mortem_path):
268 logger.error("Copying api_post_mortem.%d to %s" %
269 (vpp_pid, last_test_temp_dir))
270 shutil.copy2(api_post_mortem_path, last_test_temp_dir)
273 def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
274 if is_core_present(tempdir):
275 print('VPP core detected in %s. Last test running was %s' %
276 (tempdir, core_crash_test))
277 print(single_line_delim)
278 spawn_gdb(vpp_binary, get_core_path(tempdir))
279 print(single_line_delim)
282 def handle_cores(failed_testcases):
284 for failed_testcase in failed_testcases:
285 tcs_with_core = failed_testcase.testclasess_with_core
286 if len(tcs_with_core) > 0:
287 for test, vpp_binary, tempdir in tcs_with_core.values():
288 check_and_handle_core(vpp_binary, tempdir, test)
291 def process_finished_testsuite(wrapped_testcase_suite,
292 finished_testcase_suites,
293 failed_wrapped_testcases,
295 results.append(wrapped_testcase_suite.result)
296 finished_testcase_suites.add(wrapped_testcase_suite)
298 if failfast and not wrapped_testcase_suite.was_successful():
301 if not wrapped_testcase_suite.was_successful():
302 failed_wrapped_testcases.add(wrapped_testcase_suite)
303 handle_failed_suite(wrapped_testcase_suite.logger,
304 wrapped_testcase_suite.last_test_temp_dir,
305 wrapped_testcase_suite.vpp_pid)
310 def run_forked(testcase_suites):
311 wrapped_testcase_suites = set()
313 # suites are unhashable, need to use list
315 unread_testcases = set()
316 finished_unread_testcases = set()
317 manager = StreamQueueManager()
319 for i in range(concurrent_tests):
320 if len(testcase_suites) > 0:
321 wrapped_testcase_suite = TestCaseWrapper(testcase_suites.pop(0),
323 wrapped_testcase_suites.add(wrapped_testcase_suite)
324 unread_testcases.add(wrapped_testcase_suite)
328 read_from_testcases = threading.Event()
329 read_from_testcases.set()
330 stdouterr_thread = threading.Thread(target=stdouterr_reader_wrapper,
331 args=(unread_testcases,
332 finished_unread_testcases,
333 read_from_testcases))
334 stdouterr_thread.start()
336 failed_wrapped_testcases = set()
340 while len(wrapped_testcase_suites) > 0:
341 finished_testcase_suites = set()
342 for wrapped_testcase_suite in wrapped_testcase_suites:
343 while wrapped_testcase_suite.result_parent_end.poll():
344 wrapped_testcase_suite.result.process_result(
345 *wrapped_testcase_suite.result_parent_end.recv())
346 wrapped_testcase_suite.last_heard = time.time()
348 while wrapped_testcase_suite.keep_alive_parent_end.poll():
349 wrapped_testcase_suite.last_test, \
350 wrapped_testcase_suite.last_test_vpp_binary, \
351 wrapped_testcase_suite.last_test_temp_dir, \
352 wrapped_testcase_suite.vpp_pid = \
353 wrapped_testcase_suite.keep_alive_parent_end.recv()
354 wrapped_testcase_suite.last_heard = time.time()
356 if wrapped_testcase_suite.finished_parent_end.poll():
357 wrapped_testcase_suite.finished_parent_end.recv()
358 wrapped_testcase_suite.last_heard = time.time()
359 stop_run = process_finished_testsuite(
360 wrapped_testcase_suite,
361 finished_testcase_suites,
362 failed_wrapped_testcases,
367 if wrapped_testcase_suite.last_heard + test_timeout < \
370 wrapped_testcase_suite.logger.critical(
371 "Child test runner process timed out "
372 "(last test running was `%s' in `%s')!" %
373 (wrapped_testcase_suite.last_test,
374 wrapped_testcase_suite.last_test_temp_dir))
375 elif not wrapped_testcase_suite.child.is_alive():
377 wrapped_testcase_suite.logger.critical(
378 "Child test runner process unexpectedly died "
379 "(last test running was `%s' in `%s')!" %
380 (wrapped_testcase_suite.last_test,
381 wrapped_testcase_suite.last_test_temp_dir))
382 elif wrapped_testcase_suite.last_test_temp_dir and \
383 wrapped_testcase_suite.last_test_vpp_binary:
385 wrapped_testcase_suite.last_test_temp_dir):
386 wrapped_testcase_suite.add_testclass_with_core()
387 if wrapped_testcase_suite.core_detected_at is None:
388 wrapped_testcase_suite.core_detected_at = \
390 elif wrapped_testcase_suite.core_detected_at + \
391 core_timeout < time.time():
392 wrapped_testcase_suite.logger.critical(
393 "Child test runner process unresponsive and "
394 "core-file exists in test temporary directory "
395 "(last test running was `%s' in `%s')!" %
396 (wrapped_testcase_suite.last_test,
397 wrapped_testcase_suite.last_test_temp_dir))
401 wrapped_testcase_suite.child.terminate()
403 # terminating the child process tends to leave orphan
405 if wrapped_testcase_suite.vpp_pid:
406 os.kill(wrapped_testcase_suite.vpp_pid,
411 wrapped_testcase_suite.result.crashed = True
412 wrapped_testcase_suite.result.process_result(
413 wrapped_testcase_suite.last_test_id, ERROR)
414 stop_run = process_finished_testsuite(
415 wrapped_testcase_suite,
416 finished_testcase_suites,
417 failed_wrapped_testcases,
420 for finished_testcase in finished_testcase_suites:
421 finished_testcase.child.join()
422 finished_testcase.close_pipes()
423 wrapped_testcase_suites.remove(finished_testcase)
424 finished_unread_testcases.add(finished_testcase)
425 finished_testcase.stdouterr_queue.put(None)
427 while len(testcase_suites) > 0:
428 results.append(TestResult(testcase_suites.pop(0)))
429 elif len(testcase_suites) > 0:
430 new_testcase = TestCaseWrapper(testcase_suites.pop(0),
432 wrapped_testcase_suites.add(new_testcase)
433 unread_testcases.add(new_testcase)
435 for wrapped_testcase_suite in wrapped_testcase_suites:
436 wrapped_testcase_suite.child.terminate()
437 wrapped_testcase_suite.stdouterr_queue.put(None)
440 read_from_testcases.clear()
441 stdouterr_thread.join(test_timeout)
444 handle_cores(failed_wrapped_testcases)
448 class SplitToSuitesCallback:
449 def __init__(self, filter_callback):
451 self.suite_name = 'default'
452 self.filter_callback = filter_callback
453 self.filtered = unittest.TestSuite()
455 def __call__(self, file_name, cls, method):
456 test_method = cls(method)
457 if self.filter_callback(file_name, cls.__name__, method):
458 self.suite_name = file_name + cls.__name__
459 if self.suite_name not in self.suites:
460 self.suites[self.suite_name] = unittest.TestSuite()
461 self.suites[self.suite_name].addTest(test_method)
464 self.filtered.addTest(test_method)
470 def parse_test_option():
471 f = os.getenv(test_option, None)
472 filter_file_name = None
473 filter_class_name = None
474 filter_func_name = None
479 raise Exception("Unrecognized %s option: %s" %
482 if parts[2] not in ('*', ''):
483 filter_func_name = parts[2]
484 if parts[1] not in ('*', ''):
485 filter_class_name = parts[1]
486 if parts[0] not in ('*', ''):
487 if parts[0].startswith('test_'):
488 filter_file_name = parts[0]
490 filter_file_name = 'test_%s' % parts[0]
492 if f.startswith('test_'):
495 filter_file_name = 'test_%s' % f
497 filter_file_name = '%s.py' % filter_file_name
498 return filter_file_name, filter_class_name, filter_func_name
501 def filter_tests(tests, filter_cb):
502 result = unittest.suite.TestSuite()
504 if isinstance(t, unittest.suite.TestSuite):
505 # this is a bunch of tests, recursively filter...
506 x = filter_tests(t, filter_cb)
507 if x.countTestCases() > 0:
509 elif isinstance(t, unittest.TestCase):
510 # this is a single test
511 parts = t.id().split('.')
512 # t.id() for common cases like this:
513 # test_classifier.TestClassifier.test_acl_ip
514 # apply filtering only if it is so
516 if not filter_cb(parts[0], parts[1], parts[2]):
520 # unexpected object, don't touch it
525 class FilterByTestOption:
526 def __init__(self, filter_file_name, filter_class_name, filter_func_name):
527 self.filter_file_name = filter_file_name
528 self.filter_class_name = filter_class_name
529 self.filter_func_name = filter_func_name
531 def __call__(self, file_name, class_name, func_name):
532 if self.filter_file_name:
533 fn_match = fnmatch.fnmatch(file_name, self.filter_file_name)
536 if self.filter_class_name and class_name != self.filter_class_name:
538 if self.filter_func_name and func_name != self.filter_func_name:
543 class FilterByClassList:
544 def __init__(self, classes_with_filenames):
545 self.classes_with_filenames = classes_with_filenames
547 def __call__(self, file_name, class_name, func_name):
548 return '.'.join([file_name, class_name]) in self.classes_with_filenames
551 def suite_from_failed(suite, failed):
552 failed = {x.rsplit('.', 1)[0] for x in failed}
553 filter_cb = FilterByClassList(failed)
554 suite = filter_tests(suite, filter_cb)
558 class AllResults(dict):
560 super(AllResults, self).__init__()
561 self.all_testcases = 0
562 self.results_per_suite = []
569 self.testsuites_no_tests_run = []
571 def add_results(self, result):
572 self.results_per_suite.append(result)
573 result_types = [PASS, FAIL, ERROR, SKIP, TEST_RUN]
574 for result_type in result_types:
575 self[result_type] += len(result[result_type])
577 def add_result(self, result):
579 self.all_testcases += result.testcase_suite.countTestCases()
580 self.add_results(result)
582 if result.no_tests_run():
583 self.testsuites_no_tests_run.append(result.testcase_suite)
588 elif not result.was_successful():
592 self.rerun.append(result.testcase_suite)
596 def print_results(self):
598 print(double_line_delim)
599 print('TEST RESULTS:')
600 print(' Scheduled tests: {}'.format(self.all_testcases))
601 print(' Executed tests: {}'.format(self[TEST_RUN]))
602 print(' Passed tests: {}'.format(
603 colorize(str(self[PASS]), GREEN)))
605 print(' Skipped tests: {}'.format(
606 colorize(str(self[SKIP]), YELLOW)))
607 if self.not_executed > 0:
608 print(' Not Executed tests: {}'.format(
609 colorize(str(self.not_executed), RED)))
611 print(' Failures: {}'.format(
612 colorize(str(self[FAIL]), RED)))
614 print(' Errors: {}'.format(
615 colorize(str(self[ERROR]), RED)))
617 if self.all_failed > 0:
618 print('FAILURES AND ERRORS IN TESTS:')
619 for result in self.results_per_suite:
620 failed_testcase_ids = result[FAIL]
621 errored_testcase_ids = result[ERROR]
622 old_testcase_name = None
623 if len(failed_testcase_ids) or len(errored_testcase_ids):
624 for failed_test_id in failed_testcase_ids:
625 new_testcase_name, test_name = \
626 result.get_testcase_names(failed_test_id)
627 if new_testcase_name != old_testcase_name:
628 print(' Testcase name: {}'.format(
629 colorize(new_testcase_name, RED)))
630 old_testcase_name = new_testcase_name
631 print(' FAILURE: {} [{}]'.format(
632 colorize(test_name, RED), failed_test_id))
633 for failed_test_id in errored_testcase_ids:
634 new_testcase_name, test_name = \
635 result.get_testcase_names(failed_test_id)
636 if new_testcase_name != old_testcase_name:
637 print(' Testcase name: {}'.format(
638 colorize(new_testcase_name, RED)))
639 old_testcase_name = new_testcase_name
640 print(' ERROR: {} [{}]'.format(
641 colorize(test_name, RED), failed_test_id))
642 if len(self.testsuites_no_tests_run) > 0:
643 print('TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:')
645 for testsuite in self.testsuites_no_tests_run:
646 for testcase in testsuite:
647 tc_classes.add(get_testcase_doc_name(testcase))
648 for tc_class in tc_classes:
649 print(' {}'.format(colorize(tc_class, RED)))
651 print(double_line_delim)
655 def not_executed(self):
656 return self.all_testcases - self[TEST_RUN]
659 def all_failed(self):
660 return self[FAIL] + self[ERROR]
663 def parse_results(results):
665 Prints the number of scheduled, executed, not executed, passed, failed,
666 errored and skipped tests and details about failed and errored tests.
668 Also returns all suites where any test failed.
674 results_per_suite = AllResults()
677 for result in results:
678 result_code = results_per_suite.add_result(result)
681 elif result_code == -1:
684 results_per_suite.print_results()
692 return return_code, results_per_suite.rerun
695 def parse_digit_env(env_var, default):
696 value = os.getenv(env_var, default)
701 print('WARNING: unsupported value "%s" for env var "%s",'
702 'defaulting to %s' % (value, env_var, default))
707 if __name__ == '__main__':
709 verbose = parse_digit_env("V", 0)
711 test_timeout = parse_digit_env("TIMEOUT", 600) # default = 10 minutes
713 retries = parse_digit_env("RETRIES", 0)
715 debug = os.getenv("DEBUG", "n").lower() in ["gdb", "gdbserver"]
717 debug_core = os.getenv("DEBUG", "").lower() == "core"
719 step = os.getenv("STEP", "n").lower() in ("y", "yes", "1")
721 run_interactive = debug or step
723 test_jobs = os.getenv("TEST_JOBS", "1").lower() # default = 1 process
724 if test_jobs == 'auto':
727 print('Interactive mode required, running on one core')
729 shm_free = psutil.disk_usage('/dev/shm').free
730 shm_max_processes = 1
731 if shm_free < min_req_shm:
732 raise Exception('Not enough free space in /dev/shm. Required '
733 'free space is at least %sM.'
734 % (min_req_shm >> 20))
736 extra_shm = shm_free - min_req_shm
737 shm_max_processes += extra_shm / shm_per_process
738 concurrent_tests = min(cpu_count(), shm_max_processes)
739 print('Found enough resources to run tests with %s cores'
741 elif test_jobs.isdigit():
742 concurrent_tests = int(test_jobs)
746 if run_interactive and concurrent_tests > 1:
747 raise NotImplementedError(
748 'Running tests interactively (DEBUG is gdb or gdbserver or STEP '
749 'is set) in parallel (TEST_JOBS is more than 1) is not supported')
751 parser = argparse.ArgumentParser(description="VPP unit tests")
752 parser.add_argument("-f", "--failfast", action='store_true',
753 help="fast failure flag")
754 parser.add_argument("-d", "--dir", action='append', type=str,
755 help="directory containing test files "
756 "(may be specified multiple times)")
757 args = parser.parse_args()
758 failfast = args.failfast
761 print("Running tests using custom test runner") # debug message
762 filter_file, filter_class, filter_func = parse_test_option()
764 print("Active filters: file=%s, class=%s, function=%s" % (
765 filter_file, filter_class, filter_func))
767 filter_cb = FilterByTestOption(filter_file, filter_class, filter_func)
769 ignore_path = os.getenv("VENV_PATH", None)
770 cb = SplitToSuitesCallback(filter_cb)
772 print("Adding tests from directory tree %s" % d)
773 discover_tests(d, cb, ignore_path)
775 # suites are not hashable, need to use list
778 for testcase_suite in cb.suites.values():
779 tests_amount += testcase_suite.countTestCases()
780 suites.append(testcase_suite)
782 print("%s out of %s tests match specified filters" % (
783 tests_amount, tests_amount + cb.filtered.countTestCases()))
785 if not running_extended_tests:
786 print("Not running extended tests (some tests will be skipped)")
788 attempts = retries + 1
790 print("Perform %s attempts to pass the suite..." % attempts)
792 if run_interactive and len(suites):
793 # don't fork if requiring interactive terminal
794 full_suite = unittest.TestSuite()
795 map(full_suite.addTests, suites)
796 result = VppTestRunner(verbosity=verbose,
798 print_summary=True).run(full_suite)
799 was_successful = result.wasSuccessful()
800 if not was_successful:
801 for test_case_info in result.failed_test_cases_info:
802 handle_failed_suite(test_case_info.logger,
803 test_case_info.tempdir,
804 test_case_info.vpp_pid)
806 test_case_info in result.core_crash_test_cases_info:
807 check_and_handle_core(test_case_info.vpp_bin_path,
808 test_case_info.tempdir,
809 test_case_info.core_crash_test)
811 sys.exit(not was_successful)
814 while len(suites) > 0 and attempts > 0:
815 results = run_forked(suites)
816 exit_code, suites = parse_results(results)
819 print('Test run was successful')
821 print('%s attempt(s) left.' % attempts)