13 from multiprocessing import Process, Pipe, cpu_count
14 from multiprocessing.queues import Queue
15 from multiprocessing.managers import BaseManager
16 from framework import VppTestRunner, running_extended_tests, VppTestCase, \
17 get_testcase_doc_name, get_test_description
18 from debug import spawn_gdb
19 from log import get_parallel_logger, double_line_delim, RED, YELLOW, GREEN, \
21 from discover_tests import discover_tests
22 from subprocess import check_output, CalledProcessError
23 from util import check_core_path
25 # timeout which controls how long the child has to finish after seeing
26 # a core dump in test temporary directory. If this is exceeded, parent assumes
27 # that child process is stuck (e.g. waiting for shm mutex, which will never
28 # get unlocked) and kill the child
30 min_req_shm = 536870912 # min 512MB shm required
31 # 128MB per extra process
32 shm_per_process = 134217728
35 class StreamQueue(Queue):
40 sys.__stdout__.flush()
41 sys.__stderr__.flush()
44 return self._writer.fileno()
47 class StreamQueueManager(BaseManager):
51 StreamQueueManager.register('Queue', StreamQueue)
54 def test_runner_wrapper(suite, keep_alive_pipe, result_pipe, stdouterr_queue,
56 sys.stdout = stdouterr_queue
57 sys.stderr = stdouterr_queue
58 VppTestCase.logger = logger
59 unittest.installHandler()
60 result = VppTestRunner(keep_alive_pipe=keep_alive_pipe,
61 descriptions=descriptions,
63 failfast=failfast).run(suite)
64 result_pipe.send(result)
66 keep_alive_pipe.close()
69 class TestCaseWrapper(object):
70 def __init__(self, testcase_suite, manager):
71 self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(
73 self.result_parent_end, self.result_child_end = Pipe(duplex=False)
74 self.testcase_suite = testcase_suite
75 self.stdouterr_queue = manager.Queue()
76 self.logger = get_parallel_logger(self.stdouterr_queue)
77 self.child = Process(target=test_runner_wrapper,
78 args=(testcase_suite, self.keep_alive_child_end,
79 self.result_child_end, self.stdouterr_queue,
83 self.pid = self.child.pid
84 self.last_test_temp_dir = None
85 self.last_test_vpp_binary = None
88 self.last_heard = time.time()
89 self.core_detected_at = None
90 self.failed_tests = []
92 self.fail_addressed = False
94 def close_pipes(self):
95 self.keep_alive_child_end.close()
96 self.result_child_end.close()
97 self.keep_alive_parent_end.close()
98 self.result_parent_end.close()
101 def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases,
104 while read_testcases.is_set() or len(unread_testcases) > 0:
105 if not read_testcase:
106 if len(finished_unread_testcases) > 0:
107 read_testcase = finished_unread_testcases.pop()
108 unread_testcases.remove(read_testcase)
109 elif len(unread_testcases) > 0:
110 read_testcase = unread_testcases.pop()
113 while data is not None:
114 sys.stdout.write(data)
115 data = read_testcase.stdouterr_queue.get()
117 read_testcase.stdouterr_queue.close()
118 finished_unread_testcases.discard(read_testcase)
122 def run_forked(testcases):
123 wrapped_testcase_suites = set()
125 # suites are unhashable, need to use list
127 debug_core = os.getenv("DEBUG", "").lower() == "core"
128 unread_testcases = set()
129 finished_unread_testcases = set()
130 manager = StreamQueueManager()
132 for i in range(concurrent_tests):
133 if len(testcases) > 0:
134 wrapped_testcase_suite = TestCaseWrapper(testcases.pop(0), manager)
135 wrapped_testcase_suites.add(wrapped_testcase_suite)
136 unread_testcases.add(wrapped_testcase_suite)
141 read_from_testcases = threading.Event()
142 read_from_testcases.set()
143 stdouterr_thread = threading.Thread(target=stdouterr_reader_wrapper,
144 args=(unread_testcases,
145 finished_unread_testcases,
146 read_from_testcases))
147 stdouterr_thread.start()
149 while len(wrapped_testcase_suites) > 0:
150 finished_testcase_suites = set()
151 for wrapped_testcase_suite in wrapped_testcase_suites:
152 readable = select.select(
153 [wrapped_testcase_suite.keep_alive_parent_end.fileno(),
154 wrapped_testcase_suite.result_parent_end.fileno()],
156 if wrapped_testcase_suite.result_parent_end.fileno() in readable:
158 (wrapped_testcase_suite.testcase_suite,
159 wrapped_testcase_suite.result_parent_end.recv()))
160 finished_testcase_suites.add(wrapped_testcase_suite)
163 if wrapped_testcase_suite.keep_alive_parent_end.fileno() \
165 while wrapped_testcase_suite.keep_alive_parent_end.poll():
166 wrapped_testcase_suite.last_test, \
167 wrapped_testcase_suite.last_test_vpp_binary, \
168 wrapped_testcase_suite.last_test_temp_dir, \
169 wrapped_testcase_suite.vpp_pid = \
170 wrapped_testcase_suite.keep_alive_parent_end.recv()
171 wrapped_testcase_suite.last_heard = time.time()
173 if not wrapped_testcase_suite.fail:
174 if wrapped_testcase_suite.last_heard + \
175 test_timeout < time.time() and \
178 wrapped_testcase_suite.last_test_temp_dir):
179 wrapped_testcase_suite.fail = True
180 wrapped_testcase_suite.logger.critical(
181 "Timeout while waiting for child test "
182 "runner process (last test running was "
184 (wrapped_testcase_suite.last_test,
185 wrapped_testcase_suite.last_test_temp_dir))
186 elif not wrapped_testcase_suite.child.is_alive():
187 wrapped_testcase_suite.fail = True
188 wrapped_testcase_suite.logger.critical(
189 "Child python process unexpectedly died "
190 "(last test running was `%s' in `%s')!" %
191 (wrapped_testcase_suite.last_test,
192 wrapped_testcase_suite.last_test_temp_dir))
193 elif wrapped_testcase_suite.last_test_temp_dir and \
194 wrapped_testcase_suite.last_test_vpp_binary:
195 core_path = "%s/core" % \
196 wrapped_testcase_suite.last_test_temp_dir
197 if os.path.isfile(core_path):
198 if wrapped_testcase_suite.core_detected_at is None:
199 wrapped_testcase_suite.core_detected_at = \
201 elif wrapped_testcase_suite.core_detected_at + \
202 core_timeout < time.time():
203 if not os.path.isfile(
205 wrapped_testcase_suite.
207 wrapped_testcase_suite.logger.critical(
208 "Child python process unresponsive and "
209 "core-file exists in test temporary "
211 wrapped_testcase_suite.fail = True
213 if wrapped_testcase_suite.fail and not \
214 wrapped_testcase_suite.fail_addressed:
215 failed_dir = os.getenv('VPP_TEST_FAILED_DIR')
216 lttd = os.path.basename(
217 wrapped_testcase_suite.last_test_temp_dir)
218 link_path = '%s%s-FAILED' % (failed_dir, lttd)
219 wrapped_testcase_suite.logger.error(
220 "Creating a link to the failed test: %s -> %s" %
222 if not os.path.exists(link_path):
223 os.symlink(wrapped_testcase_suite.last_test_temp_dir,
225 api_post_mortem_path = "/tmp/api_post_mortem.%d" % \
226 wrapped_testcase_suite.vpp_pid
227 if os.path.isfile(api_post_mortem_path):
228 wrapped_testcase_suite.logger.error(
229 "Copying api_post_mortem.%d to %s" %
230 (wrapped_testcase_suite.vpp_pid,
231 wrapped_testcase_suite.last_test_temp_dir))
232 shutil.copy2(api_post_mortem_path,
233 wrapped_testcase_suite.last_test_temp_dir)
234 if wrapped_testcase_suite.last_test_temp_dir and \
235 wrapped_testcase_suite.last_test_vpp_binary:
236 core_path = "%s/core" % \
237 wrapped_testcase_suite.last_test_temp_dir
238 if os.path.isfile(core_path):
239 wrapped_testcase_suite.logger.error(
240 "Core-file exists in test temporary directory: %s!"
242 check_core_path(wrapped_testcase_suite.logger,
244 wrapped_testcase_suite.logger.debug(
245 "Running `file %s':" % core_path)
247 info = check_output(["file", core_path])
248 wrapped_testcase_suite.logger.debug(info)
249 except CalledProcessError as e:
250 wrapped_testcase_suite.logger.error(
251 "Could not run `file' utility on core-file, "
252 "rc=%s" % e.returncode)
256 wrapped_testcase_suite.last_test_vpp_binary,
257 core_path, wrapped_testcase_suite.logger)
258 os.kill(wrapped_testcase_suite.child.pid, signal.SIGINT)
260 # terminating the child process tends to leave orphan
262 os.kill(wrapped_testcase_suite.vpp_pid, signal.SIGTERM)
266 wrapped_testcase_suite.fail_addressed = True
268 for finished_testcase in finished_testcase_suites:
269 finished_testcase.child.join()
270 finished_testcase.close_pipes()
271 wrapped_testcase_suites.remove(finished_testcase)
272 finished_unread_testcases.add(finished_testcase)
273 finished_testcase.stdouterr_queue.put(None)
274 if len(testcases) > 0:
275 new_testcase = TestCaseWrapper(testcases.pop(0), manager)
276 wrapped_testcase_suites.add(new_testcase)
277 unread_testcases.add(new_testcase)
279 read_from_testcases.clear()
280 stdouterr_thread.join(test_timeout)
285 class SplitToSuitesCallback:
286 def __init__(self, filter_callback):
288 self.suite_name = 'default'
289 self.filter_callback = filter_callback
290 self.filtered = unittest.TestSuite()
292 def __call__(self, file_name, cls, method):
293 test_method = cls(method)
294 if self.filter_callback(file_name, cls.__name__, method):
295 self.suite_name = file_name + cls.__name__
296 if self.suite_name not in self.suites:
297 self.suites[self.suite_name] = unittest.TestSuite()
298 self.suites[self.suite_name].addTest(test_method)
301 self.filtered.addTest(test_method)
307 def parse_test_option():
308 f = os.getenv(test_option, None)
309 filter_file_name = None
310 filter_class_name = None
311 filter_func_name = None
316 raise Exception("Unrecognized %s option: %s" %
319 if parts[2] not in ('*', ''):
320 filter_func_name = parts[2]
321 if parts[1] not in ('*', ''):
322 filter_class_name = parts[1]
323 if parts[0] not in ('*', ''):
324 if parts[0].startswith('test_'):
325 filter_file_name = parts[0]
327 filter_file_name = 'test_%s' % parts[0]
329 if f.startswith('test_'):
332 filter_file_name = 'test_%s' % f
334 filter_file_name = '%s.py' % filter_file_name
335 return filter_file_name, filter_class_name, filter_func_name
338 def filter_tests(tests, filter_cb):
339 result = unittest.suite.TestSuite()
341 if isinstance(t, unittest.suite.TestSuite):
342 # this is a bunch of tests, recursively filter...
343 x = filter_tests(t, filter_cb)
344 if x.countTestCases() > 0:
346 elif isinstance(t, unittest.TestCase):
347 # this is a single test
348 parts = t.id().split('.')
349 # t.id() for common cases like this:
350 # test_classifier.TestClassifier.test_acl_ip
351 # apply filtering only if it is so
353 if not filter_cb(parts[0], parts[1], parts[2]):
357 # unexpected object, don't touch it
362 class FilterByTestOption:
363 def __init__(self, filter_file_name, filter_class_name, filter_func_name):
364 self.filter_file_name = filter_file_name
365 self.filter_class_name = filter_class_name
366 self.filter_func_name = filter_func_name
368 def __call__(self, file_name, class_name, func_name):
369 if self.filter_file_name and file_name != self.filter_file_name:
371 if self.filter_class_name and class_name != self.filter_class_name:
373 if self.filter_func_name and func_name != self.filter_func_name:
378 class FilterByClassList:
379 def __init__(self, class_list):
380 self.class_list = class_list
382 def __call__(self, file_name, class_name, func_name):
383 return class_name in self.class_list
386 def suite_from_failed(suite, failed):
387 filter_cb = FilterByClassList(failed)
388 suite = filter_tests(suite, filter_cb)
392 class NonPassedResults(dict):
394 super(NonPassedResults, self).__init__()
395 self.all_testcases = 0
396 self.results_per_suite = {}
397 self.failures_id = 'failures'
398 self.errors_id = 'errors'
399 self.crashes_id = 'crashes'
400 self.skipped_id = 'skipped'
401 self.expectedFailures_id = 'expectedFailures'
402 self.unexpectedSuccesses_id = 'unexpectedSuccesses'
405 self[self.failures_id] = 0
406 self[self.errors_id] = 0
407 self[self.skipped_id] = 0
408 self[self.expectedFailures_id] = 0
409 self[self.unexpectedSuccesses_id] = 0
411 def _add_result(self, test, result_id):
412 if isinstance(test, VppTestCase):
413 parts = test.id().split('.')
415 tc_class = get_testcase_doc_name(test)
416 if tc_class not in self.results_per_suite:
417 # failed, errored, skipped, expectedly failed,
418 # unexpectedly passed
419 self.results_per_suite[tc_class] = \
420 {self.failures_id: [],
423 self.expectedFailures_id: [],
424 self.unexpectedSuccesses_id: []}
425 self.results_per_suite[tc_class][result_id].append(test)
429 def add_results(self, testcases, testcase_result_id):
430 for failed_testcase, _ in testcases:
431 if self._add_result(failed_testcase, testcase_result_id):
432 self[testcase_result_id] += 1
434 def add_result(self, testcase_suite, result):
436 self.all_testcases += result.testsRun
437 self.passed += result.passed
439 # suite finished properly
440 if not result.wasSuccessful():
443 self.add_results(result.failures, self.failures_id)
444 self.add_results(result.errors, self.errors_id)
445 self.add_results(result.skipped, self.skipped_id)
446 self.add_results(result.expectedFailures,
447 self.expectedFailures_id)
448 self.add_results(result.unexpectedSuccesses,
449 self.unexpectedSuccesses_id)
452 if concurrent_tests == 1:
454 rerun_classes = {x[0].__class__.__name__ for
456 rerun_classes.update({x[0].__class__.__name__ for
457 x in result.failures})
458 self.rerun.append(suite_from_failed(testcase_suite,
461 self.rerun.append(testcase_suite)
463 self.rerun.append(testcase_suite)
467 def print_results(self):
469 print(double_line_delim)
470 print('TEST RESULTS:')
471 print(' Executed tests: {}'.format(self.all_testcases))
472 print(' Passed tests: {}'.format(
473 colorize(str(self.passed), GREEN)))
474 if self[self.failures_id] > 0:
475 print(' Failures: {}'.format(
476 colorize(str(self[self.failures_id]), RED)))
477 if self[self.errors_id] > 0:
478 print(' Errors: {}'.format(
479 colorize(str(self[self.errors_id]), RED)))
480 if self[self.skipped_id] > 0:
481 print(' Skipped tests: {}'.format(
482 colorize(str(self[self.skipped_id]), YELLOW)))
483 if self[self.expectedFailures_id] > 0:
484 print(' Expected failures: {}'.format(
485 colorize(str(self[self.expectedFailures_id]), GREEN)))
486 if self[self.unexpectedSuccesses_id] > 0:
487 print(' Unexpected successes: {}'.format(
488 colorize(str(self[self.unexpectedSuccesses_id]), YELLOW)))
490 if self.all_failed > 0:
491 print('FAILED TESTS:')
492 for testcase_class, suite_results in \
493 self.results_per_suite.items():
494 failed_testcases = suite_results[
496 errored_testcases = suite_results[
498 if len(failed_testcases) or len(errored_testcases):
499 print(' Testcase name: {}'.format(
500 colorize(testcase_class, RED)))
501 for failed_test in failed_testcases:
502 print(' FAILED: {}'.format(
503 colorize(get_test_description(
504 descriptions, failed_test), RED)))
505 for failed_test in errored_testcases:
506 print(' ERRORED: {}'.format(
507 colorize(get_test_description(
508 descriptions, failed_test), RED)))
510 print(double_line_delim)
514 def all_failed(self):
515 return self[self.failures_id] + self[self.errors_id]
518 def parse_results(results):
520 Prints the number of executed, passed, failed, errored, skipped,
521 expectedly failed and unexpectedly passed tests and details about
522 failed, errored, expectedly failed and unexpectedly passed tests.
524 Also returns any suites where any test failed.
530 results_per_suite = NonPassedResults()
533 for testcase_suite, result in results:
534 result_code = results_per_suite.add_result(testcase_suite, result)
537 elif result_code == -1:
540 results_per_suite.print_results()
548 return return_code, results_per_suite.rerun
551 def parse_digit_env(env_var, default):
552 value = os.getenv(env_var, default)
557 print('WARNING: unsupported value "%s" for env var "%s",'
558 'defaulting to %s' % (value, env_var, default))
563 if __name__ == '__main__':
565 verbose = parse_digit_env("V", 0)
567 test_timeout = parse_digit_env("TIMEOUT", 600) # default = 10 minutes
569 retries = parse_digit_env("RETRIES", 0)
571 debug = os.getenv("DEBUG", "n").lower() in ["gdb", "gdbserver"]
573 step = os.getenv("STEP", "n").lower() in ("y", "yes", "1")
576 os.getenv("FORCE_FOREGROUND", "n").lower() in ("y", "yes", "1")
578 run_interactive = debug or step or force_foreground
580 test_jobs = os.getenv("TEST_JOBS", "1").lower() # default = 1 process
581 if test_jobs == 'auto':
584 print('Interactive mode required, running on one core')
586 shm_free = psutil.disk_usage('/dev/shm').free
587 shm_max_processes = 1
588 if shm_free < min_req_shm:
589 raise Exception('Not enough free space in /dev/shm. Required '
590 'free space is at least %sM.'
591 % (min_req_shm >> 20))
593 extra_shm = shm_free - min_req_shm
594 shm_max_processes += extra_shm / shm_per_process
595 concurrent_tests = max(cpu_count(), shm_max_processes)
596 print('Found enough resources to run tests with %s cores'
598 elif test_jobs.isdigit():
599 concurrent_tests = int(test_jobs)
603 if run_interactive and concurrent_tests > 1:
604 raise NotImplementedError(
605 'Running tests interactively (DEBUG, STEP or FORCE_FOREGROUND is '
606 'set) in parallel (TEST_JOBS is more than 1) is not '
609 parser = argparse.ArgumentParser(description="VPP unit tests")
610 parser.add_argument("-f", "--failfast", action='store_true',
611 help="fast failure flag")
612 parser.add_argument("-d", "--dir", action='append', type=str,
613 help="directory containing test files "
614 "(may be specified multiple times)")
615 args = parser.parse_args()
616 failfast = args.failfast
619 print("Running tests using custom test runner") # debug message
620 filter_file, filter_class, filter_func = parse_test_option()
622 print("Active filters: file=%s, class=%s, function=%s" % (
623 filter_file, filter_class, filter_func))
625 filter_cb = FilterByTestOption(filter_file, filter_class, filter_func)
627 cb = SplitToSuitesCallback(filter_cb)
629 print("Adding tests from directory tree %s" % d)
630 discover_tests(d, cb)
632 # suites are not hashable, need to use list
635 for testcase_suite in cb.suites.values():
636 tests_amount += testcase_suite.countTestCases()
637 suites.append(testcase_suite)
639 if concurrent_tests == 1:
640 new_suite = unittest.TestSuite()
642 new_suite.addTest(suite)
646 print("%s out of %s tests match specified filters" % (
647 tests_amount, tests_amount + cb.filtered.countTestCases()))
649 if not running_extended_tests():
650 print("Not running extended tests (some tests will be skipped)")
652 attempts = retries + 1
654 print("Perform %s attempts to pass the suite..." % attempts)
657 # don't fork if requiring interactive terminal
658 sys.exit(not VppTestRunner(
659 verbosity=verbose, failfast=failfast)
660 .run(suites[0]).wasSuccessful())
663 while len(suites) > 0 and attempts > 0:
664 tests_amount = sum([x.countTestCases() for x in suites])
665 results = run_forked(suites)
666 exit_code, suites = parse_results(results)
669 print('Test run was successful')
671 print('%s attempt(s) left.' % attempts)