13 from multiprocessing import Process, Pipe, cpu_count
14 from multiprocessing.queues import Queue
15 from multiprocessing.managers import BaseManager
16 from framework import VppTestRunner, running_extended_tests, VppTestCase, \
17 get_testcase_doc_name, get_test_description
18 from debug import spawn_gdb
19 from log import get_parallel_logger, double_line_delim, RED, YELLOW, GREEN, \
21 from discover_tests import discover_tests
22 from subprocess import check_output, CalledProcessError
23 from util import check_core_path
25 # timeout which controls how long the child has to finish after seeing
26 # a core dump in test temporary directory. If this is exceeded, parent assumes
27 # that child process is stuck (e.g. waiting for shm mutex, which will never
28 # get unlocked) and kill the child
30 min_req_shm = 536870912 # min 512MB shm required
31 # 128MB per extra process
32 shm_per_process = 134217728
35 class StreamQueue(Queue):
40 sys.__stdout__.flush()
41 sys.__stderr__.flush()
44 return self._writer.fileno()
47 class StreamQueueManager(BaseManager):
51 StreamQueueManager.register('Queue', StreamQueue)
54 def test_runner_wrapper(suite, keep_alive_pipe, result_pipe, stdouterr_queue,
55 partial_result_queue, logger):
56 sys.stdout = stdouterr_queue
57 sys.stderr = stdouterr_queue
58 VppTestCase.logger = logger
59 result = VppTestRunner(keep_alive_pipe=keep_alive_pipe,
60 descriptions=descriptions,
62 results_pipe=partial_result_queue,
63 failfast=failfast).run(suite)
64 result_pipe.send(result)
66 keep_alive_pipe.close()
69 class TestCaseWrapper(object):
70 def __init__(self, testcase_suite, manager):
71 self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(
73 self.result_parent_end, self.result_child_end = Pipe(duplex=False)
74 self.partial_result_parent_end, self.partial_result_child_end = Pipe(
76 self.testcase_suite = testcase_suite
77 self.stdouterr_queue = manager.Queue()
78 self.logger = get_parallel_logger(self.stdouterr_queue)
79 self.child = Process(target=test_runner_wrapper,
80 args=(testcase_suite, self.keep_alive_child_end,
81 self.result_child_end, self.stdouterr_queue,
82 self.partial_result_child_end, self.logger)
85 self.pid = self.child.pid
86 self.last_test_temp_dir = None
87 self.last_test_vpp_binary = None
91 self.last_heard = time.time()
92 self.core_detected_at = None
93 self.failed_tests = []
94 self.partial_result = None
96 def close_pipes(self):
97 self.keep_alive_child_end.close()
98 self.result_child_end.close()
99 self.partial_result_child_end.close()
100 self.keep_alive_parent_end.close()
101 self.result_parent_end.close()
102 self.partial_result_parent_end.close()
105 def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases,
108 while read_testcases.is_set() or len(unread_testcases) > 0:
109 if not read_testcase:
110 if len(finished_unread_testcases) > 0:
111 read_testcase = finished_unread_testcases.pop()
112 unread_testcases.remove(read_testcase)
113 elif len(unread_testcases) > 0:
114 read_testcase = unread_testcases.pop()
117 while data is not None:
118 sys.stdout.write(data)
119 data = read_testcase.stdouterr_queue.get()
121 read_testcase.stdouterr_queue.close()
122 finished_unread_testcases.discard(read_testcase)
126 def run_forked(testcase_suites):
127 wrapped_testcase_suites = set()
129 # suites are unhashable, need to use list
131 debug_core = os.getenv("DEBUG", "").lower() == "core"
132 unread_testcases = set()
133 finished_unread_testcases = set()
134 manager = StreamQueueManager()
136 for i in range(concurrent_tests):
137 if len(testcase_suites) > 0:
138 wrapped_testcase_suite = TestCaseWrapper(testcase_suites.pop(0),
140 wrapped_testcase_suites.add(wrapped_testcase_suite)
141 unread_testcases.add(wrapped_testcase_suite)
146 read_from_testcases = threading.Event()
147 read_from_testcases.set()
148 stdouterr_thread = threading.Thread(target=stdouterr_reader_wrapper,
149 args=(unread_testcases,
150 finished_unread_testcases,
151 read_from_testcases))
152 stdouterr_thread.start()
154 while len(wrapped_testcase_suites) > 0:
155 finished_testcase_suites = set()
156 for wrapped_testcase_suite in wrapped_testcase_suites:
157 readable = select.select(
158 [wrapped_testcase_suite.keep_alive_parent_end.fileno(),
159 wrapped_testcase_suite.result_parent_end.fileno(),
160 wrapped_testcase_suite.partial_result_parent_end.fileno()],
162 if wrapped_testcase_suite.result_parent_end.fileno() in readable:
164 (wrapped_testcase_suite.testcase_suite,
165 wrapped_testcase_suite.result_parent_end.recv()))
166 finished_testcase_suites.add(wrapped_testcase_suite)
169 if wrapped_testcase_suite.partial_result_parent_end.fileno() \
171 while wrapped_testcase_suite.partial_result_parent_end.poll():
172 wrapped_testcase_suite.partial_result = \
173 wrapped_testcase_suite.partial_result_parent_end.recv()
174 wrapped_testcase_suite.last_heard = time.time()
176 if wrapped_testcase_suite.keep_alive_parent_end.fileno() \
178 while wrapped_testcase_suite.keep_alive_parent_end.poll():
179 wrapped_testcase_suite.last_test, \
180 wrapped_testcase_suite.last_test_vpp_binary, \
181 wrapped_testcase_suite.last_test_temp_dir, \
182 wrapped_testcase_suite.vpp_pid = \
183 wrapped_testcase_suite.keep_alive_parent_end.recv()
184 wrapped_testcase_suite.last_heard = time.time()
187 if wrapped_testcase_suite.last_heard + test_timeout < time.time() \
188 and not os.path.isfile(
190 wrapped_testcase_suite.last_test_temp_dir):
192 wrapped_testcase_suite.logger.critical(
193 "Timeout while waiting for child test "
194 "runner process (last test running was "
196 (wrapped_testcase_suite.last_test,
197 wrapped_testcase_suite.last_test_temp_dir))
198 elif not wrapped_testcase_suite.child.is_alive():
200 wrapped_testcase_suite.logger.critical(
201 "Child python process unexpectedly died "
202 "(last test running was `%s' in `%s')!" %
203 (wrapped_testcase_suite.last_test,
204 wrapped_testcase_suite.last_test_temp_dir))
205 elif wrapped_testcase_suite.last_test_temp_dir and \
206 wrapped_testcase_suite.last_test_vpp_binary:
207 core_path = "%s/core" % \
208 wrapped_testcase_suite.last_test_temp_dir
209 if os.path.isfile(core_path):
210 if wrapped_testcase_suite.core_detected_at is None:
211 wrapped_testcase_suite.core_detected_at = time.time()
212 elif wrapped_testcase_suite.core_detected_at + \
213 core_timeout < time.time():
214 if not os.path.isfile(
216 wrapped_testcase_suite.
218 wrapped_testcase_suite.logger.critical(
219 "Child python process unresponsive and core-"
220 "file exists in test temporary directory!")
224 failed_dir = os.getenv('VPP_TEST_FAILED_DIR')
225 if wrapped_testcase_suite.last_test_temp_dir:
226 lttd = os.path.basename(
227 wrapped_testcase_suite.last_test_temp_dir)
230 link_path = '%s%s-FAILED' % (failed_dir, lttd)
231 wrapped_testcase_suite.logger.error(
232 "Creating a link to the failed test: %s -> %s" %
234 if not os.path.exists(link_path) \
235 and wrapped_testcase_suite.last_test_temp_dir:
236 os.symlink(wrapped_testcase_suite.last_test_temp_dir,
238 api_post_mortem_path = "/tmp/api_post_mortem.%d" % \
239 wrapped_testcase_suite.vpp_pid
240 if os.path.isfile(api_post_mortem_path):
241 wrapped_testcase_suite.logger.error(
242 "Copying api_post_mortem.%d to %s" %
243 (wrapped_testcase_suite.vpp_pid,
244 wrapped_testcase_suite.last_test_temp_dir))
245 shutil.copy2(api_post_mortem_path,
246 wrapped_testcase_suite.last_test_temp_dir)
247 if wrapped_testcase_suite.last_test_temp_dir and \
248 wrapped_testcase_suite.last_test_vpp_binary:
249 core_path = "%s/core" % \
250 wrapped_testcase_suite.last_test_temp_dir
251 if os.path.isfile(core_path):
252 wrapped_testcase_suite.logger.error(
253 "Core-file exists in test temporary directory: %s!"
255 check_core_path(wrapped_testcase_suite.logger,
257 wrapped_testcase_suite.logger.debug(
258 "Running `file %s':" % core_path)
260 info = check_output(["file", core_path])
261 wrapped_testcase_suite.logger.debug(info)
262 except CalledProcessError as e:
263 wrapped_testcase_suite.logger.error(
264 "Could not run `file' utility on core-file, "
265 "rc=%s" % e.returncode)
269 wrapped_testcase_suite.last_test_vpp_binary,
270 core_path, wrapped_testcase_suite.logger)
271 wrapped_testcase_suite.child.terminate()
273 # terminating the child process tends to leave orphan
275 os.kill(wrapped_testcase_suite.vpp_pid, signal.SIGTERM)
279 results.append((wrapped_testcase_suite.testcase_suite,
280 wrapped_testcase_suite.partial_result))
281 finished_testcase_suites.add(wrapped_testcase_suite)
283 for finished_testcase in finished_testcase_suites:
284 finished_testcase.child.join()
285 finished_testcase.close_pipes()
286 wrapped_testcase_suites.remove(finished_testcase)
287 finished_unread_testcases.add(finished_testcase)
288 finished_testcase.stdouterr_queue.put(None)
289 if len(testcase_suites) > 0:
290 new_testcase = TestCaseWrapper(testcase_suites.pop(0), manager)
291 wrapped_testcase_suites.add(new_testcase)
292 unread_testcases.add(new_testcase)
294 read_from_testcases.clear()
295 stdouterr_thread.join(test_timeout)
300 class SplitToSuitesCallback:
301 def __init__(self, filter_callback):
303 self.suite_name = 'default'
304 self.filter_callback = filter_callback
305 self.filtered = unittest.TestSuite()
307 def __call__(self, file_name, cls, method):
308 test_method = cls(method)
309 if self.filter_callback(file_name, cls.__name__, method):
310 self.suite_name = file_name + cls.__name__
311 if self.suite_name not in self.suites:
312 self.suites[self.suite_name] = unittest.TestSuite()
313 self.suites[self.suite_name].addTest(test_method)
316 self.filtered.addTest(test_method)
322 def parse_test_option():
323 f = os.getenv(test_option, None)
324 filter_file_name = None
325 filter_class_name = None
326 filter_func_name = None
331 raise Exception("Unrecognized %s option: %s" %
334 if parts[2] not in ('*', ''):
335 filter_func_name = parts[2]
336 if parts[1] not in ('*', ''):
337 filter_class_name = parts[1]
338 if parts[0] not in ('*', ''):
339 if parts[0].startswith('test_'):
340 filter_file_name = parts[0]
342 filter_file_name = 'test_%s' % parts[0]
344 if f.startswith('test_'):
347 filter_file_name = 'test_%s' % f
349 filter_file_name = '%s.py' % filter_file_name
350 return filter_file_name, filter_class_name, filter_func_name
353 def filter_tests(tests, filter_cb):
354 result = unittest.suite.TestSuite()
356 if isinstance(t, unittest.suite.TestSuite):
357 # this is a bunch of tests, recursively filter...
358 x = filter_tests(t, filter_cb)
359 if x.countTestCases() > 0:
361 elif isinstance(t, unittest.TestCase):
362 # this is a single test
363 parts = t.id().split('.')
364 # t.id() for common cases like this:
365 # test_classifier.TestClassifier.test_acl_ip
366 # apply filtering only if it is so
368 if not filter_cb(parts[0], parts[1], parts[2]):
372 # unexpected object, don't touch it
377 class FilterByTestOption:
378 def __init__(self, filter_file_name, filter_class_name, filter_func_name):
379 self.filter_file_name = filter_file_name
380 self.filter_class_name = filter_class_name
381 self.filter_func_name = filter_func_name
383 def __call__(self, file_name, class_name, func_name):
384 if self.filter_file_name and file_name != self.filter_file_name:
386 if self.filter_class_name and class_name != self.filter_class_name:
388 if self.filter_func_name and func_name != self.filter_func_name:
393 class FilterByClassList:
394 def __init__(self, classes_with_filenames):
395 self.classes_with_filenames = classes_with_filenames
397 def __call__(self, file_name, class_name, func_name):
398 return '.'.join([file_name, class_name]) in self.classes_with_filenames
401 def suite_from_failed(suite, failed):
402 failed = {x.rsplit('.', 1)[0] for x in failed}
403 filter_cb = FilterByClassList(failed)
404 suite = filter_tests(suite, filter_cb)
408 class NonPassedResults(dict):
410 super(NonPassedResults, self).__init__()
411 self.all_testcases = 0
412 self.results_per_suite = {}
413 self.failures_id = 'failures'
414 self.errors_id = 'errors'
415 self.crashes_id = 'crashes'
416 self.skipped_id = 'skipped'
417 self.expectedFailures_id = 'expectedFailures'
418 self.unexpectedSuccesses_id = 'unexpectedSuccesses'
421 self[self.failures_id] = 0
422 self[self.errors_id] = 0
423 self[self.skipped_id] = 0
424 self[self.expectedFailures_id] = 0
425 self[self.unexpectedSuccesses_id] = 0
427 def _add_result(self, test, result_id):
428 if isinstance(test, VppTestCase):
429 parts = test.id().split('.')
431 tc_class = get_testcase_doc_name(test)
432 if tc_class not in self.results_per_suite:
433 # failed, errored, skipped, expectedly failed,
434 # unexpectedly passed
435 self.results_per_suite[tc_class] = \
436 {self.failures_id: [],
439 self.expectedFailures_id: [],
440 self.unexpectedSuccesses_id: []}
441 self.results_per_suite[tc_class][result_id].append(test)
445 def add_results(self, testcases, testcase_result_id):
446 for failed_testcase, _ in testcases:
447 if self._add_result(failed_testcase, testcase_result_id):
448 self[testcase_result_id] += 1
450 def add_result(self, testcase_suite, result):
453 self.all_testcases += result.testsRun
454 self.passed += len(result.passed)
455 if not len(result.passed) + len(result.skipped) \
456 == testcase_suite.countTestCases():
459 self.add_results(result.failures, self.failures_id)
460 self.add_results(result.errors, self.errors_id)
461 self.add_results(result.skipped, self.skipped_id)
462 self.add_results(result.expectedFailures,
463 self.expectedFailures_id)
464 self.add_results(result.unexpectedSuccesses,
465 self.unexpectedSuccesses_id)
470 if concurrent_tests == 1:
473 skipped = [x.id() for (x, _) in result.skipped]
474 for testcase in testcase_suite:
475 tc_id = testcase.id()
476 if tc_id not in result.passed and \
477 tc_id not in skipped:
479 if len(rerun_ids) > 0:
480 self.rerun.append(suite_from_failed(testcase_suite,
483 self.rerun.append(testcase_suite)
485 self.rerun.append(testcase_suite)
489 def print_results(self):
491 print(double_line_delim)
492 print('TEST RESULTS:')
493 print(' Executed tests: {}'.format(self.all_testcases))
494 print(' Passed tests: {}'.format(
495 colorize(str(self.passed), GREEN)))
496 if self[self.failures_id] > 0:
497 print(' Failures: {}'.format(
498 colorize(str(self[self.failures_id]), RED)))
499 if self[self.errors_id] > 0:
500 print(' Errors: {}'.format(
501 colorize(str(self[self.errors_id]), RED)))
502 if self[self.skipped_id] > 0:
503 print(' Skipped tests: {}'.format(
504 colorize(str(self[self.skipped_id]), YELLOW)))
505 if self[self.expectedFailures_id] > 0:
506 print(' Expected failures: {}'.format(
507 colorize(str(self[self.expectedFailures_id]), GREEN)))
508 if self[self.unexpectedSuccesses_id] > 0:
509 print(' Unexpected successes: {}'.format(
510 colorize(str(self[self.unexpectedSuccesses_id]), YELLOW)))
512 if self.all_failed > 0:
513 print('FAILED TESTS:')
514 for testcase_class, suite_results in \
515 self.results_per_suite.items():
516 failed_testcases = suite_results[
518 errored_testcases = suite_results[
520 if len(failed_testcases) or len(errored_testcases):
521 print(' Testcase name: {}'.format(
522 colorize(testcase_class, RED)))
523 for failed_test in failed_testcases:
524 print(' FAILED: {}'.format(
525 colorize(get_test_description(
526 descriptions, failed_test), RED)))
527 for failed_test in errored_testcases:
528 print(' ERRORED: {}'.format(
529 colorize(get_test_description(
530 descriptions, failed_test), RED)))
532 print(double_line_delim)
536 def all_failed(self):
537 return self[self.failures_id] + self[self.errors_id]
540 def parse_results(results):
542 Prints the number of executed, passed, failed, errored, skipped,
543 expectedly failed and unexpectedly passed tests and details about
544 failed, errored, expectedly failed and unexpectedly passed tests.
546 Also returns any suites where any test failed.
552 results_per_suite = NonPassedResults()
555 for testcase_suite, result in results:
556 result_code = results_per_suite.add_result(testcase_suite, result)
559 elif result_code == -1:
562 results_per_suite.print_results()
570 return return_code, results_per_suite.rerun
573 def parse_digit_env(env_var, default):
574 value = os.getenv(env_var, default)
579 print('WARNING: unsupported value "%s" for env var "%s",'
580 'defaulting to %s' % (value, env_var, default))
585 if __name__ == '__main__':
587 verbose = parse_digit_env("V", 0)
589 test_timeout = parse_digit_env("TIMEOUT", 600) # default = 10 minutes
591 retries = parse_digit_env("RETRIES", 0)
593 debug = os.getenv("DEBUG", "n").lower() in ["gdb", "gdbserver"]
595 step = os.getenv("STEP", "n").lower() in ("y", "yes", "1")
598 os.getenv("FORCE_FOREGROUND", "n").lower() in ("y", "yes", "1")
600 run_interactive = debug or step or force_foreground
602 test_jobs = os.getenv("TEST_JOBS", "1").lower() # default = 1 process
603 if test_jobs == 'auto':
606 print('Interactive mode required, running on one core')
608 shm_free = psutil.disk_usage('/dev/shm').free
609 shm_max_processes = 1
610 if shm_free < min_req_shm:
611 raise Exception('Not enough free space in /dev/shm. Required '
612 'free space is at least %sM.'
613 % (min_req_shm >> 20))
615 extra_shm = shm_free - min_req_shm
616 shm_max_processes += extra_shm / shm_per_process
617 concurrent_tests = max(cpu_count(), shm_max_processes)
618 print('Found enough resources to run tests with %s cores'
620 elif test_jobs.isdigit():
621 concurrent_tests = int(test_jobs)
625 if run_interactive and concurrent_tests > 1:
626 raise NotImplementedError(
627 'Running tests interactively (DEBUG, STEP or FORCE_FOREGROUND is '
628 'set) in parallel (TEST_JOBS is more than 1) is not '
631 parser = argparse.ArgumentParser(description="VPP unit tests")
632 parser.add_argument("-f", "--failfast", action='store_true',
633 help="fast failure flag")
634 parser.add_argument("-d", "--dir", action='append', type=str,
635 help="directory containing test files "
636 "(may be specified multiple times)")
637 args = parser.parse_args()
638 failfast = args.failfast
641 print("Running tests using custom test runner") # debug message
642 filter_file, filter_class, filter_func = parse_test_option()
644 print("Active filters: file=%s, class=%s, function=%s" % (
645 filter_file, filter_class, filter_func))
647 filter_cb = FilterByTestOption(filter_file, filter_class, filter_func)
649 cb = SplitToSuitesCallback(filter_cb)
651 print("Adding tests from directory tree %s" % d)
652 discover_tests(d, cb)
654 # suites are not hashable, need to use list
657 for testcase_suite in cb.suites.values():
658 tests_amount += testcase_suite.countTestCases()
659 suites.append(testcase_suite)
661 if concurrent_tests == 1:
662 new_suite = unittest.TestSuite()
664 new_suite.addTests(suite)
668 print("%s out of %s tests match specified filters" % (
669 tests_amount, tests_amount + cb.filtered.countTestCases()))
671 if not running_extended_tests():
672 print("Not running extended tests (some tests will be skipped)")
674 attempts = retries + 1
676 print("Perform %s attempts to pass the suite..." % attempts)
679 # don't fork if requiring interactive terminal
680 sys.exit(not VppTestRunner(
681 verbosity=verbose, failfast=failfast)
682 .run(suites[0]).wasSuccessful())
685 while len(suites) > 0 and attempts > 0:
686 tests_amount = sum([x.countTestCases() for x in suites])
687 results = run_forked(suites)
688 exit_code, suites = parse_results(results)
691 print('Test run was successful')
693 print('%s attempt(s) left.' % attempts)