13 from multiprocessing import Process, Pipe, cpu_count
14 from multiprocessing.queues import Queue
15 from multiprocessing.managers import BaseManager
16 from framework import VppTestRunner, running_extended_tests, VppTestCase, \
17 get_testcase_doc_name, get_test_description
18 from debug import spawn_gdb
19 from log import get_parallel_logger, double_line_delim, RED, YELLOW, GREEN, \
21 from discover_tests import discover_tests
22 from subprocess import check_output, CalledProcessError
23 from util import check_core_path
25 # timeout which controls how long the child has to finish after seeing
26 # a core dump in test temporary directory. If this is exceeded, parent assumes
27 # that child process is stuck (e.g. waiting for shm mutex, which will never
28 # get unlocked) and kill the child
30 min_req_shm = 536870912 # min 512MB shm required
31 # 128MB per extra process
32 shm_per_process = 134217728
35 class StreamQueue(Queue):
40 sys.__stdout__.flush()
41 sys.__stderr__.flush()
44 return self._writer.fileno()
47 class StreamQueueManager(BaseManager):
51 StreamQueueManager.register('Queue', StreamQueue)
54 def test_runner_wrapper(suite, keep_alive_pipe, result_pipe, stdouterr_queue,
55 partial_result_queue, logger):
56 sys.stdout = stdouterr_queue
57 sys.stderr = stdouterr_queue
58 VppTestCase.logger = logger
59 result = VppTestRunner(keep_alive_pipe=keep_alive_pipe,
60 descriptions=descriptions,
62 results_pipe=partial_result_queue,
63 failfast=failfast).run(suite)
64 result_pipe.send(result)
66 keep_alive_pipe.close()
69 class TestCaseWrapper(object):
70 def __init__(self, testcase_suite, manager):
71 self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(
73 self.result_parent_end, self.result_child_end = Pipe(duplex=False)
74 self.partial_result_parent_end, self.partial_result_child_end = Pipe(
76 self.testcase_suite = testcase_suite
77 self.stdouterr_queue = manager.Queue()
78 self.logger = get_parallel_logger(self.stdouterr_queue)
79 self.child = Process(target=test_runner_wrapper,
80 args=(testcase_suite, self.keep_alive_child_end,
81 self.result_child_end, self.stdouterr_queue,
82 self.partial_result_child_end, self.logger)
85 self.pid = self.child.pid
86 self.last_test_temp_dir = None
87 self.last_test_vpp_binary = None
90 self.last_heard = time.time()
91 self.core_detected_at = None
92 self.failed_tests = []
93 self.partial_result = None
95 def close_pipes(self):
96 self.keep_alive_child_end.close()
97 self.result_child_end.close()
98 self.partial_result_child_end.close()
99 self.keep_alive_parent_end.close()
100 self.result_parent_end.close()
101 self.partial_result_parent_end.close()
104 def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases,
107 while read_testcases.is_set() or len(unread_testcases) > 0:
108 if not read_testcase:
109 if len(finished_unread_testcases) > 0:
110 read_testcase = finished_unread_testcases.pop()
111 unread_testcases.remove(read_testcase)
112 elif len(unread_testcases) > 0:
113 read_testcase = unread_testcases.pop()
116 while data is not None:
117 sys.stdout.write(data)
118 data = read_testcase.stdouterr_queue.get()
120 read_testcase.stdouterr_queue.close()
121 finished_unread_testcases.discard(read_testcase)
125 def run_forked(testcases):
126 wrapped_testcase_suites = set()
128 # suites are unhashable, need to use list
130 debug_core = os.getenv("DEBUG", "").lower() == "core"
131 unread_testcases = set()
132 finished_unread_testcases = set()
133 manager = StreamQueueManager()
135 for i in range(concurrent_tests):
136 if len(testcases) > 0:
137 wrapped_testcase_suite = TestCaseWrapper(testcases.pop(0), manager)
138 wrapped_testcase_suites.add(wrapped_testcase_suite)
139 unread_testcases.add(wrapped_testcase_suite)
144 read_from_testcases = threading.Event()
145 read_from_testcases.set()
146 stdouterr_thread = threading.Thread(target=stdouterr_reader_wrapper,
147 args=(unread_testcases,
148 finished_unread_testcases,
149 read_from_testcases))
150 stdouterr_thread.start()
152 while len(wrapped_testcase_suites) > 0:
153 finished_testcase_suites = set()
154 for wrapped_testcase_suite in wrapped_testcase_suites:
155 readable = select.select(
156 [wrapped_testcase_suite.keep_alive_parent_end.fileno(),
157 wrapped_testcase_suite.result_parent_end.fileno(),
158 wrapped_testcase_suite.partial_result_parent_end.fileno()],
160 if wrapped_testcase_suite.result_parent_end.fileno() in readable:
162 (wrapped_testcase_suite.testcase_suite,
163 wrapped_testcase_suite.result_parent_end.recv()))
164 finished_testcase_suites.add(wrapped_testcase_suite)
167 if wrapped_testcase_suite.partial_result_parent_end.fileno() \
169 while wrapped_testcase_suite.partial_result_parent_end.poll():
170 wrapped_testcase_suite.partial_result = \
171 wrapped_testcase_suite.partial_result_parent_end.recv()
172 wrapped_testcase_suite.last_heard = time.time()
174 if wrapped_testcase_suite.keep_alive_parent_end.fileno() \
176 while wrapped_testcase_suite.keep_alive_parent_end.poll():
177 wrapped_testcase_suite.last_test, \
178 wrapped_testcase_suite.last_test_vpp_binary, \
179 wrapped_testcase_suite.last_test_temp_dir, \
180 wrapped_testcase_suite.vpp_pid = \
181 wrapped_testcase_suite.keep_alive_parent_end.recv()
182 wrapped_testcase_suite.last_heard = time.time()
185 if wrapped_testcase_suite.last_heard + test_timeout < time.time() \
186 and not os.path.isfile(
188 wrapped_testcase_suite.last_test_temp_dir):
190 wrapped_testcase_suite.logger.critical(
191 "Timeout while waiting for child test "
192 "runner process (last test running was "
194 (wrapped_testcase_suite.last_test,
195 wrapped_testcase_suite.last_test_temp_dir))
196 elif not wrapped_testcase_suite.child.is_alive():
198 wrapped_testcase_suite.logger.critical(
199 "Child python process unexpectedly died "
200 "(last test running was `%s' in `%s')!" %
201 (wrapped_testcase_suite.last_test,
202 wrapped_testcase_suite.last_test_temp_dir))
203 elif wrapped_testcase_suite.last_test_temp_dir and \
204 wrapped_testcase_suite.last_test_vpp_binary:
205 core_path = "%s/core" % \
206 wrapped_testcase_suite.last_test_temp_dir
207 if os.path.isfile(core_path):
208 if wrapped_testcase_suite.core_detected_at is None:
209 wrapped_testcase_suite.core_detected_at = time.time()
210 elif wrapped_testcase_suite.core_detected_at + \
211 core_timeout < time.time():
212 if not os.path.isfile(
214 wrapped_testcase_suite.
216 wrapped_testcase_suite.logger.critical(
217 "Child python process unresponsive and core-"
218 "file exists in test temporary directory!")
222 failed_dir = os.getenv('VPP_TEST_FAILED_DIR')
223 lttd = os.path.basename(
224 wrapped_testcase_suite.last_test_temp_dir)
225 link_path = '%s%s-FAILED' % (failed_dir, lttd)
226 wrapped_testcase_suite.logger.error(
227 "Creating a link to the failed test: %s -> %s" %
229 if not os.path.exists(link_path):
230 os.symlink(wrapped_testcase_suite.last_test_temp_dir,
232 api_post_mortem_path = "/tmp/api_post_mortem.%d" % \
233 wrapped_testcase_suite.vpp_pid
234 if os.path.isfile(api_post_mortem_path):
235 wrapped_testcase_suite.logger.error(
236 "Copying api_post_mortem.%d to %s" %
237 (wrapped_testcase_suite.vpp_pid,
238 wrapped_testcase_suite.last_test_temp_dir))
239 shutil.copy2(api_post_mortem_path,
240 wrapped_testcase_suite.last_test_temp_dir)
241 if wrapped_testcase_suite.last_test_temp_dir and \
242 wrapped_testcase_suite.last_test_vpp_binary:
243 core_path = "%s/core" % \
244 wrapped_testcase_suite.last_test_temp_dir
245 if os.path.isfile(core_path):
246 wrapped_testcase_suite.logger.error(
247 "Core-file exists in test temporary directory: %s!"
249 check_core_path(wrapped_testcase_suite.logger,
251 wrapped_testcase_suite.logger.debug(
252 "Running `file %s':" % core_path)
254 info = check_output(["file", core_path])
255 wrapped_testcase_suite.logger.debug(info)
256 except CalledProcessError as e:
257 wrapped_testcase_suite.logger.error(
258 "Could not run `file' utility on core-file, "
259 "rc=%s" % e.returncode)
263 wrapped_testcase_suite.last_test_vpp_binary,
264 core_path, wrapped_testcase_suite.logger)
265 wrapped_testcase_suite.child.terminate()
267 # terminating the child process tends to leave orphan
269 os.kill(wrapped_testcase_suite.vpp_pid, signal.SIGTERM)
273 results.append((wrapped_testcase_suite.testcase_suite,
274 wrapped_testcase_suite.partial_result))
275 finished_testcase_suites.add(wrapped_testcase_suite)
277 for finished_testcase in finished_testcase_suites:
278 finished_testcase.child.join()
279 finished_testcase.close_pipes()
280 wrapped_testcase_suites.remove(finished_testcase)
281 finished_unread_testcases.add(finished_testcase)
282 finished_testcase.stdouterr_queue.put(None)
283 if len(testcases) > 0:
284 new_testcase = TestCaseWrapper(testcases.pop(0), manager)
285 wrapped_testcase_suites.add(new_testcase)
286 unread_testcases.add(new_testcase)
288 read_from_testcases.clear()
289 stdouterr_thread.join(test_timeout)
294 class SplitToSuitesCallback:
295 def __init__(self, filter_callback):
297 self.suite_name = 'default'
298 self.filter_callback = filter_callback
299 self.filtered = unittest.TestSuite()
301 def __call__(self, file_name, cls, method):
302 test_method = cls(method)
303 if self.filter_callback(file_name, cls.__name__, method):
304 self.suite_name = file_name + cls.__name__
305 if self.suite_name not in self.suites:
306 self.suites[self.suite_name] = unittest.TestSuite()
307 self.suites[self.suite_name].addTest(test_method)
310 self.filtered.addTest(test_method)
316 def parse_test_option():
317 f = os.getenv(test_option, None)
318 filter_file_name = None
319 filter_class_name = None
320 filter_func_name = None
325 raise Exception("Unrecognized %s option: %s" %
328 if parts[2] not in ('*', ''):
329 filter_func_name = parts[2]
330 if parts[1] not in ('*', ''):
331 filter_class_name = parts[1]
332 if parts[0] not in ('*', ''):
333 if parts[0].startswith('test_'):
334 filter_file_name = parts[0]
336 filter_file_name = 'test_%s' % parts[0]
338 if f.startswith('test_'):
341 filter_file_name = 'test_%s' % f
343 filter_file_name = '%s.py' % filter_file_name
344 return filter_file_name, filter_class_name, filter_func_name
347 def filter_tests(tests, filter_cb):
348 result = unittest.suite.TestSuite()
350 if isinstance(t, unittest.suite.TestSuite):
351 # this is a bunch of tests, recursively filter...
352 x = filter_tests(t, filter_cb)
353 if x.countTestCases() > 0:
355 elif isinstance(t, unittest.TestCase):
356 # this is a single test
357 parts = t.id().split('.')
358 # t.id() for common cases like this:
359 # test_classifier.TestClassifier.test_acl_ip
360 # apply filtering only if it is so
362 if not filter_cb(parts[0], parts[1], parts[2]):
366 # unexpected object, don't touch it
371 class FilterByTestOption:
372 def __init__(self, filter_file_name, filter_class_name, filter_func_name):
373 self.filter_file_name = filter_file_name
374 self.filter_class_name = filter_class_name
375 self.filter_func_name = filter_func_name
377 def __call__(self, file_name, class_name, func_name):
378 if self.filter_file_name and file_name != self.filter_file_name:
380 if self.filter_class_name and class_name != self.filter_class_name:
382 if self.filter_func_name and func_name != self.filter_func_name:
387 class FilterByClassList:
388 def __init__(self, class_list):
389 self.class_list = class_list
391 def __call__(self, file_name, class_name, func_name):
392 return class_name in self.class_list
395 def suite_from_failed(suite, failed):
396 filter_cb = FilterByClassList(failed)
397 suite = filter_tests(suite, filter_cb)
401 class NonPassedResults(dict):
403 super(NonPassedResults, self).__init__()
404 self.all_testcases = 0
405 self.results_per_suite = {}
406 self.failures_id = 'failures'
407 self.errors_id = 'errors'
408 self.crashes_id = 'crashes'
409 self.skipped_id = 'skipped'
410 self.expectedFailures_id = 'expectedFailures'
411 self.unexpectedSuccesses_id = 'unexpectedSuccesses'
414 self[self.failures_id] = 0
415 self[self.errors_id] = 0
416 self[self.skipped_id] = 0
417 self[self.expectedFailures_id] = 0
418 self[self.unexpectedSuccesses_id] = 0
420 def _add_result(self, test, result_id):
421 if isinstance(test, VppTestCase):
422 parts = test.id().split('.')
424 tc_class = get_testcase_doc_name(test)
425 if tc_class not in self.results_per_suite:
426 # failed, errored, skipped, expectedly failed,
427 # unexpectedly passed
428 self.results_per_suite[tc_class] = \
429 {self.failures_id: [],
432 self.expectedFailures_id: [],
433 self.unexpectedSuccesses_id: []}
434 self.results_per_suite[tc_class][result_id].append(test)
438 def add_results(self, testcases, testcase_result_id):
439 for failed_testcase, _ in testcases:
440 if self._add_result(failed_testcase, testcase_result_id):
441 self[testcase_result_id] += 1
443 def add_result(self, testcase_suite, result):
445 self.all_testcases += result.testsRun
446 self.passed += result.passed
448 # suite finished properly
449 if not result.wasSuccessful():
452 self.add_results(result.failures, self.failures_id)
453 self.add_results(result.errors, self.errors_id)
454 self.add_results(result.skipped, self.skipped_id)
455 self.add_results(result.expectedFailures,
456 self.expectedFailures_id)
457 self.add_results(result.unexpectedSuccesses,
458 self.unexpectedSuccesses_id)
461 if concurrent_tests == 1:
463 rerun_classes = {x[0].__class__.__name__ for
465 rerun_classes.update({x[0].__class__.__name__ for
466 x in result.failures})
467 self.rerun.append(suite_from_failed(testcase_suite,
470 self.rerun.append(testcase_suite)
472 self.rerun.append(testcase_suite)
476 def print_results(self):
478 print(double_line_delim)
479 print('TEST RESULTS:')
480 print(' Executed tests: {}'.format(self.all_testcases))
481 print(' Passed tests: {}'.format(
482 colorize(str(self.passed), GREEN)))
483 if self[self.failures_id] > 0:
484 print(' Failures: {}'.format(
485 colorize(str(self[self.failures_id]), RED)))
486 if self[self.errors_id] > 0:
487 print(' Errors: {}'.format(
488 colorize(str(self[self.errors_id]), RED)))
489 if self[self.skipped_id] > 0:
490 print(' Skipped tests: {}'.format(
491 colorize(str(self[self.skipped_id]), YELLOW)))
492 if self[self.expectedFailures_id] > 0:
493 print(' Expected failures: {}'.format(
494 colorize(str(self[self.expectedFailures_id]), GREEN)))
495 if self[self.unexpectedSuccesses_id] > 0:
496 print(' Unexpected successes: {}'.format(
497 colorize(str(self[self.unexpectedSuccesses_id]), YELLOW)))
499 if self.all_failed > 0:
500 print('FAILED TESTS:')
501 for testcase_class, suite_results in \
502 self.results_per_suite.items():
503 failed_testcases = suite_results[
505 errored_testcases = suite_results[
507 if len(failed_testcases) or len(errored_testcases):
508 print(' Testcase name: {}'.format(
509 colorize(testcase_class, RED)))
510 for failed_test in failed_testcases:
511 print(' FAILED: {}'.format(
512 colorize(get_test_description(
513 descriptions, failed_test), RED)))
514 for failed_test in errored_testcases:
515 print(' ERRORED: {}'.format(
516 colorize(get_test_description(
517 descriptions, failed_test), RED)))
519 print(double_line_delim)
523 def all_failed(self):
524 return self[self.failures_id] + self[self.errors_id]
527 def parse_results(results):
529 Prints the number of executed, passed, failed, errored, skipped,
530 expectedly failed and unexpectedly passed tests and details about
531 failed, errored, expectedly failed and unexpectedly passed tests.
533 Also returns any suites where any test failed.
539 results_per_suite = NonPassedResults()
542 for testcase_suite, result in results:
543 result_code = results_per_suite.add_result(testcase_suite, result)
546 elif result_code == -1:
549 results_per_suite.print_results()
557 return return_code, results_per_suite.rerun
560 def parse_digit_env(env_var, default):
561 value = os.getenv(env_var, default)
566 print('WARNING: unsupported value "%s" for env var "%s",'
567 'defaulting to %s' % (value, env_var, default))
572 if __name__ == '__main__':
574 verbose = parse_digit_env("V", 0)
576 test_timeout = parse_digit_env("TIMEOUT", 600) # default = 10 minutes
578 retries = parse_digit_env("RETRIES", 0)
580 debug = os.getenv("DEBUG", "n").lower() in ["gdb", "gdbserver"]
582 step = os.getenv("STEP", "n").lower() in ("y", "yes", "1")
585 os.getenv("FORCE_FOREGROUND", "n").lower() in ("y", "yes", "1")
587 run_interactive = debug or step or force_foreground
589 test_jobs = os.getenv("TEST_JOBS", "1").lower() # default = 1 process
590 if test_jobs == 'auto':
593 print('Interactive mode required, running on one core')
595 shm_free = psutil.disk_usage('/dev/shm').free
596 shm_max_processes = 1
597 if shm_free < min_req_shm:
598 raise Exception('Not enough free space in /dev/shm. Required '
599 'free space is at least %sM.'
600 % (min_req_shm >> 20))
602 extra_shm = shm_free - min_req_shm
603 shm_max_processes += extra_shm / shm_per_process
604 concurrent_tests = max(cpu_count(), shm_max_processes)
605 print('Found enough resources to run tests with %s cores'
607 elif test_jobs.isdigit():
608 concurrent_tests = int(test_jobs)
612 if run_interactive and concurrent_tests > 1:
613 raise NotImplementedError(
614 'Running tests interactively (DEBUG, STEP or FORCE_FOREGROUND is '
615 'set) in parallel (TEST_JOBS is more than 1) is not '
618 parser = argparse.ArgumentParser(description="VPP unit tests")
619 parser.add_argument("-f", "--failfast", action='store_true',
620 help="fast failure flag")
621 parser.add_argument("-d", "--dir", action='append', type=str,
622 help="directory containing test files "
623 "(may be specified multiple times)")
624 args = parser.parse_args()
625 failfast = args.failfast
628 print("Running tests using custom test runner") # debug message
629 filter_file, filter_class, filter_func = parse_test_option()
631 print("Active filters: file=%s, class=%s, function=%s" % (
632 filter_file, filter_class, filter_func))
634 filter_cb = FilterByTestOption(filter_file, filter_class, filter_func)
636 cb = SplitToSuitesCallback(filter_cb)
638 print("Adding tests from directory tree %s" % d)
639 discover_tests(d, cb)
641 # suites are not hashable, need to use list
644 for testcase_suite in cb.suites.values():
645 tests_amount += testcase_suite.countTestCases()
646 suites.append(testcase_suite)
648 if concurrent_tests == 1:
649 new_suite = unittest.TestSuite()
651 new_suite.addTest(suite)
655 print("%s out of %s tests match specified filters" % (
656 tests_amount, tests_amount + cb.filtered.countTestCases()))
658 if not running_extended_tests():
659 print("Not running extended tests (some tests will be skipped)")
661 attempts = retries + 1
663 print("Perform %s attempts to pass the suite..." % attempts)
666 # don't fork if requiring interactive terminal
667 sys.exit(not VppTestRunner(
668 verbosity=verbose, failfast=failfast)
669 .run(suites[0]).wasSuccessful())
672 while len(suites) > 0 and attempts > 0:
673 tests_amount = sum([x.countTestCases() for x in suites])
674 results = run_forked(suites)
675 exit_code, suites = parse_results(results)
678 print('Test run was successful')
680 print('%s attempt(s) left.' % attempts)