13 from multiprocessing import Process, Pipe, cpu_count
14 from multiprocessing.queues import Queue
15 from multiprocessing.managers import BaseManager
16 from framework import VppTestRunner, running_extended_tests, VppTestCase, \
17 get_testcase_doc_name, get_test_description, PASS, FAIL, ERROR, SKIP, \
19 from debug import spawn_gdb
20 from log import get_parallel_logger, double_line_delim, RED, YELLOW, GREEN, \
22 from discover_tests import discover_tests
23 from subprocess import check_output, CalledProcessError
24 from util import check_core_path
26 # timeout which controls how long the child has to finish after seeing
27 # a core dump in test temporary directory. If this is exceeded, parent assumes
28 # that child process is stuck (e.g. waiting for shm mutex, which will never
29 # get unlocked) and kill the child
31 min_req_shm = 536870912 # min 512MB shm required
32 # 128MB per extra process
33 shm_per_process = 134217728
36 class StreamQueue(Queue):
41 sys.__stdout__.flush()
42 sys.__stderr__.flush()
45 return self._writer.fileno()
48 class StreamQueueManager(BaseManager):
52 StreamQueueManager.register('StreamQueue', StreamQueue)
55 class TestResult(dict):
56 def __init__(self, testcase_suite):
57 super(TestResult, self).__init__()
63 self.testcase_suite = testcase_suite
64 self.testcases = [testcase for testcase in testcase_suite]
65 self.testcases_by_id = {}
67 def was_successful(self):
68 return len(self[PASS] + self[SKIP]) \
69 == self.testcase_suite.countTestCases()
71 def no_tests_run(self):
72 return 0 == len(self[TEST_RUN])
74 def process_result(self, test_id, result):
75 self[result].append(test_id)
76 for testcase in self.testcases:
77 if testcase.id() == test_id:
78 self.testcases_by_id[test_id] = testcase
79 self.testcases.remove(testcase)
82 def suite_from_failed(self):
84 for testcase in self.testcase_suite:
86 if tc_id not in self[PASS] and tc_id not in self[SKIP]:
88 if len(rerun_ids) > 0:
89 return suite_from_failed(self.testcase_suite, rerun_ids)
91 def get_testcase_names(self, test_id):
92 return self._get_testcase_class(test_id), \
93 self._get_test_description(test_id)
95 def _get_test_description(self, test_id):
96 if test_id in self.testcases_by_id:
97 return get_test_description(descriptions,
98 self.testcases_by_id[test_id])
102 def _get_testcase_class(self, test_id):
103 if test_id in self.testcases_by_id:
104 return get_testcase_doc_name(self.testcases_by_id[test_id])
109 def test_runner_wrapper(suite, keep_alive_pipe, stdouterr_queue,
110 finished_pipe, result_pipe, logger):
111 sys.stdout = stdouterr_queue
112 sys.stderr = stdouterr_queue
113 VppTestCase.logger = logger
114 result = VppTestRunner(keep_alive_pipe=keep_alive_pipe,
115 descriptions=descriptions,
117 result_pipe=result_pipe,
118 failfast=failfast).run(suite)
119 finished_pipe.send(result.wasSuccessful())
120 finished_pipe.close()
121 keep_alive_pipe.close()
124 class TestCaseWrapper(object):
125 def __init__(self, testcase_suite, manager):
126 self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(
128 self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
129 self.result_parent_end, self.result_child_end = Pipe(duplex=False)
130 self.testcase_suite = testcase_suite
131 self.stdouterr_queue = manager.StreamQueue()
132 self.logger = get_parallel_logger(self.stdouterr_queue)
133 self.child = Process(target=test_runner_wrapper,
134 args=(testcase_suite,
135 self.keep_alive_child_end,
136 self.stdouterr_queue,
137 self.finished_child_end,
138 self.result_child_end,
142 self.last_test_temp_dir = None
143 self.last_test_vpp_binary = None
144 self.last_test = None
147 self.last_heard = time.time()
148 self.core_detected_at = None
149 self.result = TestResult(testcase_suite)
151 def close_pipes(self):
152 self.keep_alive_child_end.close()
153 self.finished_child_end.close()
154 self.result_child_end.close()
155 self.keep_alive_parent_end.close()
156 self.finished_parent_end.close()
157 self.result_parent_end.close()
160 def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases,
163 while read_testcases.is_set() or len(unread_testcases) > 0:
164 if not read_testcase:
165 if len(finished_unread_testcases) > 0:
166 read_testcase = finished_unread_testcases.pop()
167 unread_testcases.remove(read_testcase)
168 elif len(unread_testcases) > 0:
169 read_testcase = unread_testcases.pop()
172 while data is not None:
173 sys.stdout.write(data)
174 data = read_testcase.stdouterr_queue.get()
176 read_testcase.stdouterr_queue.close()
177 finished_unread_testcases.discard(read_testcase)
181 def run_forked(testcase_suites):
182 wrapped_testcase_suites = set()
184 # suites are unhashable, need to use list
186 debug_core = os.getenv("DEBUG", "").lower() == "core"
187 unread_testcases = set()
188 finished_unread_testcases = set()
189 manager = StreamQueueManager()
191 for i in range(concurrent_tests):
192 if len(testcase_suites) > 0:
193 wrapped_testcase_suite = TestCaseWrapper(testcase_suites.pop(0),
195 wrapped_testcase_suites.add(wrapped_testcase_suite)
196 unread_testcases.add(wrapped_testcase_suite)
201 read_from_testcases = threading.Event()
202 read_from_testcases.set()
203 stdouterr_thread = threading.Thread(target=stdouterr_reader_wrapper,
204 args=(unread_testcases,
205 finished_unread_testcases,
206 read_from_testcases))
207 stdouterr_thread.start()
209 while len(wrapped_testcase_suites) > 0:
210 finished_testcase_suites = set()
211 for wrapped_testcase_suite in wrapped_testcase_suites:
212 while wrapped_testcase_suite.result_parent_end.poll():
213 wrapped_testcase_suite.result.process_result(
214 *wrapped_testcase_suite.result_parent_end.recv())
215 wrapped_testcase_suite.last_heard = time.time()
217 if wrapped_testcase_suite.finished_parent_end.poll():
218 wrapped_testcase_suite.finished_parent_end.recv()
219 results.append(wrapped_testcase_suite.result)
220 finished_testcase_suites.add(wrapped_testcase_suite)
223 while wrapped_testcase_suite.keep_alive_parent_end.poll():
224 wrapped_testcase_suite.last_test, \
225 wrapped_testcase_suite.last_test_vpp_binary, \
226 wrapped_testcase_suite.last_test_temp_dir, \
227 wrapped_testcase_suite.vpp_pid = \
228 wrapped_testcase_suite.keep_alive_parent_end.recv()
229 wrapped_testcase_suite.last_heard = time.time()
232 if wrapped_testcase_suite.last_heard + test_timeout < time.time() \
233 and not os.path.isfile(
235 wrapped_testcase_suite.last_test_temp_dir):
237 wrapped_testcase_suite.logger.critical(
238 "Timeout while waiting for child test "
239 "runner process (last test running was "
241 (wrapped_testcase_suite.last_test,
242 wrapped_testcase_suite.last_test_temp_dir))
243 elif not wrapped_testcase_suite.child.is_alive():
245 wrapped_testcase_suite.logger.critical(
246 "Child python process unexpectedly died "
247 "(last test running was `%s' in `%s')!" %
248 (wrapped_testcase_suite.last_test,
249 wrapped_testcase_suite.last_test_temp_dir))
250 elif wrapped_testcase_suite.last_test_temp_dir and \
251 wrapped_testcase_suite.last_test_vpp_binary:
252 core_path = "%s/core" % \
253 wrapped_testcase_suite.last_test_temp_dir
254 if os.path.isfile(core_path):
255 if wrapped_testcase_suite.core_detected_at is None:
256 wrapped_testcase_suite.core_detected_at = time.time()
257 elif wrapped_testcase_suite.core_detected_at + \
258 core_timeout < time.time():
259 if not os.path.isfile(
261 wrapped_testcase_suite.
263 wrapped_testcase_suite.logger.critical(
264 "Child python process unresponsive and core-"
265 "file exists in test temporary directory!")
269 failed_dir = os.getenv('VPP_TEST_FAILED_DIR')
270 if wrapped_testcase_suite.last_test_temp_dir:
271 lttd = os.path.basename(
272 wrapped_testcase_suite.last_test_temp_dir)
275 link_path = '%s%s-FAILED' % (failed_dir, lttd)
276 wrapped_testcase_suite.logger.error(
277 "Creating a link to the failed test: %s -> %s" %
279 if not os.path.exists(link_path) \
280 and wrapped_testcase_suite.last_test_temp_dir:
281 os.symlink(wrapped_testcase_suite.last_test_temp_dir,
283 api_post_mortem_path = "/tmp/api_post_mortem.%d" % \
284 wrapped_testcase_suite.vpp_pid
285 if os.path.isfile(api_post_mortem_path):
286 wrapped_testcase_suite.logger.error(
287 "Copying api_post_mortem.%d to %s" %
288 (wrapped_testcase_suite.vpp_pid,
289 wrapped_testcase_suite.last_test_temp_dir))
290 shutil.copy2(api_post_mortem_path,
291 wrapped_testcase_suite.last_test_temp_dir)
292 if wrapped_testcase_suite.last_test_temp_dir and \
293 wrapped_testcase_suite.last_test_vpp_binary:
294 core_path = "%s/core" % \
295 wrapped_testcase_suite.last_test_temp_dir
296 if os.path.isfile(core_path):
297 wrapped_testcase_suite.logger.error(
298 "Core-file exists in test temporary directory: %s!"
300 check_core_path(wrapped_testcase_suite.logger,
302 wrapped_testcase_suite.logger.debug(
303 "Running `file %s':" % core_path)
305 info = check_output(["file", core_path])
306 wrapped_testcase_suite.logger.debug(info)
307 except CalledProcessError as e:
308 wrapped_testcase_suite.logger.error(
309 "Could not run `file' utility on core-file, "
310 "rc=%s" % e.returncode)
314 wrapped_testcase_suite.last_test_vpp_binary,
315 core_path, wrapped_testcase_suite.logger)
316 wrapped_testcase_suite.child.terminate()
318 # terminating the child process tends to leave orphan
320 os.kill(wrapped_testcase_suite.vpp_pid, signal.SIGTERM)
324 results.append(wrapped_testcase_suite.result)
325 finished_testcase_suites.add(wrapped_testcase_suite)
327 for finished_testcase in finished_testcase_suites:
328 finished_testcase.child.join()
329 finished_testcase.close_pipes()
330 wrapped_testcase_suites.remove(finished_testcase)
331 finished_unread_testcases.add(finished_testcase)
332 finished_testcase.stdouterr_queue.put(None)
333 if len(testcase_suites) > 0:
334 new_testcase = TestCaseWrapper(testcase_suites.pop(0), manager)
335 wrapped_testcase_suites.add(new_testcase)
336 unread_testcases.add(new_testcase)
338 read_from_testcases.clear()
339 stdouterr_thread.join(test_timeout)
344 class SplitToSuitesCallback:
345 def __init__(self, filter_callback):
347 self.suite_name = 'default'
348 self.filter_callback = filter_callback
349 self.filtered = unittest.TestSuite()
351 def __call__(self, file_name, cls, method):
352 test_method = cls(method)
353 if self.filter_callback(file_name, cls.__name__, method):
354 self.suite_name = file_name + cls.__name__
355 if self.suite_name not in self.suites:
356 self.suites[self.suite_name] = unittest.TestSuite()
357 self.suites[self.suite_name].addTest(test_method)
360 self.filtered.addTest(test_method)
366 def parse_test_option():
367 f = os.getenv(test_option, None)
368 filter_file_name = None
369 filter_class_name = None
370 filter_func_name = None
375 raise Exception("Unrecognized %s option: %s" %
378 if parts[2] not in ('*', ''):
379 filter_func_name = parts[2]
380 if parts[1] not in ('*', ''):
381 filter_class_name = parts[1]
382 if parts[0] not in ('*', ''):
383 if parts[0].startswith('test_'):
384 filter_file_name = parts[0]
386 filter_file_name = 'test_%s' % parts[0]
388 if f.startswith('test_'):
391 filter_file_name = 'test_%s' % f
393 filter_file_name = '%s.py' % filter_file_name
394 return filter_file_name, filter_class_name, filter_func_name
397 def filter_tests(tests, filter_cb):
398 result = unittest.suite.TestSuite()
400 if isinstance(t, unittest.suite.TestSuite):
401 # this is a bunch of tests, recursively filter...
402 x = filter_tests(t, filter_cb)
403 if x.countTestCases() > 0:
405 elif isinstance(t, unittest.TestCase):
406 # this is a single test
407 parts = t.id().split('.')
408 # t.id() for common cases like this:
409 # test_classifier.TestClassifier.test_acl_ip
410 # apply filtering only if it is so
412 if not filter_cb(parts[0], parts[1], parts[2]):
416 # unexpected object, don't touch it
421 class FilterByTestOption:
422 def __init__(self, filter_file_name, filter_class_name, filter_func_name):
423 self.filter_file_name = filter_file_name
424 self.filter_class_name = filter_class_name
425 self.filter_func_name = filter_func_name
427 def __call__(self, file_name, class_name, func_name):
428 if self.filter_file_name and file_name != self.filter_file_name:
430 if self.filter_class_name and class_name != self.filter_class_name:
432 if self.filter_func_name and func_name != self.filter_func_name:
437 class FilterByClassList:
438 def __init__(self, classes_with_filenames):
439 self.classes_with_filenames = classes_with_filenames
441 def __call__(self, file_name, class_name, func_name):
442 return '.'.join([file_name, class_name]) in self.classes_with_filenames
445 def suite_from_failed(suite, failed):
446 failed = {x.rsplit('.', 1)[0] for x in failed}
447 filter_cb = FilterByClassList(failed)
448 suite = filter_tests(suite, filter_cb)
452 class AllResults(dict):
454 super(AllResults, self).__init__()
455 self.all_testcases = 0
456 self.results_per_suite = []
463 self.testsuites_no_tests_run = []
465 def add_results(self, result):
466 self.results_per_suite.append(result)
467 result_types = [PASS, FAIL, ERROR, SKIP, TEST_RUN]
468 for result_type in result_types:
469 self[result_type] += len(result[result_type])
471 def add_result(self, result):
473 self.all_testcases += result.testcase_suite.countTestCases()
474 if not result.no_tests_run():
475 if not result.was_successful():
478 self.add_results(result)
480 self.testsuites_no_tests_run.append(result.testcase_suite)
484 if concurrent_tests == 1:
485 if not result.no_tests_run():
486 self.rerun.append(result.suite_from_failed())
488 self.rerun.append(result.testcase_suite)
490 self.rerun.append(result.testcase_suite)
494 def print_results(self):
496 print(double_line_delim)
497 print('TEST RESULTS:')
498 print(' Scheduled tests: {}'.format(self.all_testcases))
499 print(' Executed tests: {}'.format(self[TEST_RUN]))
500 print(' Passed tests: {}'.format(
501 colorize(str(self[PASS]), GREEN)))
503 print(' Skipped tests: {}'.format(
504 colorize(str(self[SKIP]), YELLOW)))
505 if self.not_executed > 0:
506 print(' Not Executed tests: {}'.format(
507 colorize(str(self.not_executed), RED)))
509 print(' Failures: {}'.format(
510 colorize(str(self[FAIL]), RED)))
512 print(' Errors: {}'.format(
513 colorize(str(self[ERROR]), RED)))
515 if self.all_failed > 0:
516 print('FAILED TESTS:')
517 for result in self.results_per_suite:
518 failed_testcase_ids = result[FAIL]
519 errored_testcase_ids = result[ERROR]
520 old_testcase_name = None
521 if len(failed_testcase_ids) or len(errored_testcase_ids):
522 for failed_test_id in failed_testcase_ids:
523 new_testcase_name, test_name = \
524 result.get_testcase_names(failed_test_id)
525 if new_testcase_name != old_testcase_name:
526 print(' Testcase name: {}'.format(
527 colorize(new_testcase_name, RED)))
528 old_testcase_name = new_testcase_name
529 print(' FAILED: {}'.format(
530 colorize(test_name, RED)))
531 for failed_test_id in errored_testcase_ids:
532 new_testcase_name, test_name = \
533 result.get_testcase_names(failed_test_id)
534 if new_testcase_name != old_testcase_name:
535 print(' Testcase name: {}'.format(
536 colorize(new_testcase_name, RED)))
537 old_testcase_name = new_testcase_name
538 print(' ERRORED: {}'.format(
539 colorize(test_name, RED)))
540 if len(self.testsuites_no_tests_run) > 0:
541 print('TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:')
543 for testsuite in self.testsuites_no_tests_run:
544 for testcase in testsuite:
545 tc_classes.add(get_testcase_doc_name(testcase))
546 for tc_class in tc_classes:
547 print(' {}'.format(colorize(tc_class, RED)))
549 print(double_line_delim)
553 def not_executed(self):
554 return self.all_testcases - self[TEST_RUN]
557 def all_failed(self):
558 return self[FAIL] + self[ERROR]
561 def parse_results(results):
563 Prints the number of scheduled, executed, not executed, passed, failed,
564 errored and skipped tests and details about failed and errored tests.
566 Also returns all suites where any test failed.
572 results_per_suite = AllResults()
575 for result in results:
576 result_code = results_per_suite.add_result(result)
579 elif result_code == -1:
582 results_per_suite.print_results()
590 return return_code, results_per_suite.rerun
593 def parse_digit_env(env_var, default):
594 value = os.getenv(env_var, default)
599 print('WARNING: unsupported value "%s" for env var "%s",'
600 'defaulting to %s' % (value, env_var, default))
605 if __name__ == '__main__':
607 verbose = parse_digit_env("V", 0)
609 test_timeout = parse_digit_env("TIMEOUT", 600) # default = 10 minutes
611 retries = parse_digit_env("RETRIES", 0)
613 debug = os.getenv("DEBUG", "n").lower() in ["gdb", "gdbserver"]
615 step = os.getenv("STEP", "n").lower() in ("y", "yes", "1")
618 os.getenv("FORCE_FOREGROUND", "n").lower() in ("y", "yes", "1")
620 run_interactive = debug or step or force_foreground
622 test_jobs = os.getenv("TEST_JOBS", "1").lower() # default = 1 process
623 if test_jobs == 'auto':
626 print('Interactive mode required, running on one core')
628 shm_free = psutil.disk_usage('/dev/shm').free
629 shm_max_processes = 1
630 if shm_free < min_req_shm:
631 raise Exception('Not enough free space in /dev/shm. Required '
632 'free space is at least %sM.'
633 % (min_req_shm >> 20))
635 extra_shm = shm_free - min_req_shm
636 shm_max_processes += extra_shm / shm_per_process
637 concurrent_tests = min(cpu_count(), shm_max_processes)
638 print('Found enough resources to run tests with %s cores'
640 elif test_jobs.isdigit():
641 concurrent_tests = int(test_jobs)
645 if run_interactive and concurrent_tests > 1:
646 raise NotImplementedError(
647 'Running tests interactively (DEBUG, STEP or FORCE_FOREGROUND is '
648 'set) in parallel (TEST_JOBS is more than 1) is not '
651 parser = argparse.ArgumentParser(description="VPP unit tests")
652 parser.add_argument("-f", "--failfast", action='store_true',
653 help="fast failure flag")
654 parser.add_argument("-d", "--dir", action='append', type=str,
655 help="directory containing test files "
656 "(may be specified multiple times)")
657 args = parser.parse_args()
658 failfast = args.failfast
661 print("Running tests using custom test runner") # debug message
662 filter_file, filter_class, filter_func = parse_test_option()
664 print("Active filters: file=%s, class=%s, function=%s" % (
665 filter_file, filter_class, filter_func))
667 filter_cb = FilterByTestOption(filter_file, filter_class, filter_func)
669 cb = SplitToSuitesCallback(filter_cb)
671 print("Adding tests from directory tree %s" % d)
672 discover_tests(d, cb)
674 # suites are not hashable, need to use list
677 for testcase_suite in cb.suites.values():
678 tests_amount += testcase_suite.countTestCases()
679 suites.append(testcase_suite)
681 if concurrent_tests == 1:
682 new_suite = unittest.TestSuite()
684 new_suite.addTests(suite)
688 print("%s out of %s tests match specified filters" % (
689 tests_amount, tests_amount + cb.filtered.countTestCases()))
691 if not running_extended_tests():
692 print("Not running extended tests (some tests will be skipped)")
694 attempts = retries + 1
696 print("Perform %s attempts to pass the suite..." % attempts)
699 # don't fork if requiring interactive terminal
700 sys.exit(not VppTestRunner(
701 verbosity=verbose, failfast=failfast)
702 .run(suites[0]).wasSuccessful())
705 while len(suites) > 0 and attempts > 0:
706 tests_amount = sum([x.countTestCases() for x in suites])
707 results = run_forked(suites)
708 exit_code, suites = parse_results(results)
711 print('Test run was successful')
713 print('%s attempt(s) left.' % attempts)