14 from multiprocessing import Process, Pipe, cpu_count
15 from multiprocessing.queues import Queue
16 from multiprocessing.managers import BaseManager
17 from framework import VppTestRunner, running_extended_tests, VppTestCase, \
18 get_testcase_doc_name, get_test_description, PASS, FAIL, ERROR, SKIP, \
20 from debug import spawn_gdb
21 from log import get_parallel_logger, double_line_delim, RED, YELLOW, GREEN, \
23 from discover_tests import discover_tests
24 from subprocess import check_output, CalledProcessError
25 from util import check_core_path
27 # timeout which controls how long the child has to finish after seeing
28 # a core dump in test temporary directory. If this is exceeded, parent assumes
29 # that child process is stuck (e.g. waiting for shm mutex, which will never
30 # get unlocked) and kill the child
32 min_req_shm = 536870912 # min 512MB shm required
33 # 128MB per extra process
34 shm_per_process = 134217728
37 class StreamQueue(Queue):
42 sys.__stdout__.flush()
43 sys.__stderr__.flush()
46 return self._writer.fileno()
49 class StreamQueueManager(BaseManager):
53 StreamQueueManager.register('StreamQueue', StreamQueue)
56 class TestResult(dict):
57 def __init__(self, testcase_suite):
58 super(TestResult, self).__init__()
64 self.testcase_suite = testcase_suite
65 self.testcases = [testcase for testcase in testcase_suite]
66 self.testcases_by_id = {}
68 def was_successful(self):
69 return len(self[PASS] + self[SKIP]) \
70 == self.testcase_suite.countTestCases()
72 def no_tests_run(self):
73 return 0 == len(self[TEST_RUN])
75 def process_result(self, test_id, result):
76 self[result].append(test_id)
77 for testcase in self.testcases:
78 if testcase.id() == test_id:
79 self.testcases_by_id[test_id] = testcase
80 self.testcases.remove(testcase)
83 def suite_from_failed(self):
85 for testcase in self.testcase_suite:
87 if tc_id not in self[PASS] and tc_id not in self[SKIP]:
89 if len(rerun_ids) > 0:
90 return suite_from_failed(self.testcase_suite, rerun_ids)
92 def get_testcase_names(self, test_id):
93 return self._get_testcase_class(test_id), \
94 self._get_test_description(test_id)
96 def _get_test_description(self, test_id):
97 if test_id in self.testcases_by_id:
98 return get_test_description(descriptions,
99 self.testcases_by_id[test_id])
103 def _get_testcase_class(self, test_id):
104 if test_id in self.testcases_by_id:
105 return get_testcase_doc_name(self.testcases_by_id[test_id])
110 def test_runner_wrapper(suite, keep_alive_pipe, stdouterr_queue,
111 finished_pipe, result_pipe, logger):
112 sys.stdout = stdouterr_queue
113 sys.stderr = stdouterr_queue
114 VppTestCase.logger = logger
115 result = VppTestRunner(keep_alive_pipe=keep_alive_pipe,
116 descriptions=descriptions,
118 result_pipe=result_pipe,
119 failfast=failfast).run(suite)
120 finished_pipe.send(result.wasSuccessful())
121 finished_pipe.close()
122 keep_alive_pipe.close()
125 class TestCaseWrapper(object):
126 def __init__(self, testcase_suite, manager):
127 self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(
129 self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
130 self.result_parent_end, self.result_child_end = Pipe(duplex=False)
131 self.testcase_suite = testcase_suite
132 self.stdouterr_queue = manager.StreamQueue()
133 self.logger = get_parallel_logger(self.stdouterr_queue)
134 self.child = Process(target=test_runner_wrapper,
135 args=(testcase_suite,
136 self.keep_alive_child_end,
137 self.stdouterr_queue,
138 self.finished_child_end,
139 self.result_child_end,
143 self.last_test_temp_dir = None
144 self.last_test_vpp_binary = None
145 self.last_test = None
148 self.last_heard = time.time()
149 self.core_detected_at = None
150 self.result = TestResult(testcase_suite)
152 def close_pipes(self):
153 self.keep_alive_child_end.close()
154 self.finished_child_end.close()
155 self.result_child_end.close()
156 self.keep_alive_parent_end.close()
157 self.finished_parent_end.close()
158 self.result_parent_end.close()
161 def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases,
164 while read_testcases.is_set() or len(unread_testcases) > 0:
165 if not read_testcase:
166 if len(finished_unread_testcases) > 0:
167 read_testcase = finished_unread_testcases.pop()
168 unread_testcases.remove(read_testcase)
169 elif len(unread_testcases) > 0:
170 read_testcase = unread_testcases.pop()
173 while data is not None:
174 sys.stdout.write(data)
175 data = read_testcase.stdouterr_queue.get()
177 read_testcase.stdouterr_queue.close()
178 finished_unread_testcases.discard(read_testcase)
182 def run_forked(testcase_suites):
183 wrapped_testcase_suites = set()
185 # suites are unhashable, need to use list
187 debug_core = os.getenv("DEBUG", "").lower() == "core"
188 unread_testcases = set()
189 finished_unread_testcases = set()
190 manager = StreamQueueManager()
192 for i in range(concurrent_tests):
193 if len(testcase_suites) > 0:
194 wrapped_testcase_suite = TestCaseWrapper(testcase_suites.pop(0),
196 wrapped_testcase_suites.add(wrapped_testcase_suite)
197 unread_testcases.add(wrapped_testcase_suite)
202 read_from_testcases = threading.Event()
203 read_from_testcases.set()
204 stdouterr_thread = threading.Thread(target=stdouterr_reader_wrapper,
205 args=(unread_testcases,
206 finished_unread_testcases,
207 read_from_testcases))
208 stdouterr_thread.start()
210 while len(wrapped_testcase_suites) > 0:
211 finished_testcase_suites = set()
212 for wrapped_testcase_suite in wrapped_testcase_suites:
213 while wrapped_testcase_suite.result_parent_end.poll():
214 wrapped_testcase_suite.result.process_result(
215 *wrapped_testcase_suite.result_parent_end.recv())
216 wrapped_testcase_suite.last_heard = time.time()
218 if wrapped_testcase_suite.finished_parent_end.poll():
219 wrapped_testcase_suite.finished_parent_end.recv()
220 results.append(wrapped_testcase_suite.result)
221 finished_testcase_suites.add(wrapped_testcase_suite)
224 while wrapped_testcase_suite.keep_alive_parent_end.poll():
225 wrapped_testcase_suite.last_test, \
226 wrapped_testcase_suite.last_test_vpp_binary, \
227 wrapped_testcase_suite.last_test_temp_dir, \
228 wrapped_testcase_suite.vpp_pid = \
229 wrapped_testcase_suite.keep_alive_parent_end.recv()
230 wrapped_testcase_suite.last_heard = time.time()
233 if wrapped_testcase_suite.last_heard + test_timeout < time.time() \
234 and not os.path.isfile(
236 wrapped_testcase_suite.last_test_temp_dir):
238 wrapped_testcase_suite.logger.critical(
239 "Timeout while waiting for child test "
240 "runner process (last test running was "
242 (wrapped_testcase_suite.last_test,
243 wrapped_testcase_suite.last_test_temp_dir))
244 elif not wrapped_testcase_suite.child.is_alive():
246 wrapped_testcase_suite.logger.critical(
247 "Child python process unexpectedly died "
248 "(last test running was `%s' in `%s')!" %
249 (wrapped_testcase_suite.last_test,
250 wrapped_testcase_suite.last_test_temp_dir))
251 elif wrapped_testcase_suite.last_test_temp_dir and \
252 wrapped_testcase_suite.last_test_vpp_binary:
253 core_path = "%s/core" % \
254 wrapped_testcase_suite.last_test_temp_dir
255 if os.path.isfile(core_path):
256 if wrapped_testcase_suite.core_detected_at is None:
257 wrapped_testcase_suite.core_detected_at = time.time()
258 elif wrapped_testcase_suite.core_detected_at + \
259 core_timeout < time.time():
260 if not os.path.isfile(
262 wrapped_testcase_suite.
264 wrapped_testcase_suite.logger.critical(
265 "Child python process unresponsive and core-"
266 "file exists in test temporary directory!")
270 failed_dir = os.getenv('VPP_TEST_FAILED_DIR')
271 if wrapped_testcase_suite.last_test_temp_dir:
272 lttd = os.path.basename(
273 wrapped_testcase_suite.last_test_temp_dir)
276 link_path = '%s%s-FAILED' % (failed_dir, lttd)
277 wrapped_testcase_suite.logger.error(
278 "Creating a link to the failed test: %s -> %s" %
280 if not os.path.exists(link_path) \
281 and wrapped_testcase_suite.last_test_temp_dir:
282 os.symlink(wrapped_testcase_suite.last_test_temp_dir,
284 api_post_mortem_path = "/tmp/api_post_mortem.%d" % \
285 wrapped_testcase_suite.vpp_pid
286 if os.path.isfile(api_post_mortem_path):
287 wrapped_testcase_suite.logger.error(
288 "Copying api_post_mortem.%d to %s" %
289 (wrapped_testcase_suite.vpp_pid,
290 wrapped_testcase_suite.last_test_temp_dir))
291 shutil.copy2(api_post_mortem_path,
292 wrapped_testcase_suite.last_test_temp_dir)
293 if wrapped_testcase_suite.last_test_temp_dir and \
294 wrapped_testcase_suite.last_test_vpp_binary:
295 core_path = "%s/core" % \
296 wrapped_testcase_suite.last_test_temp_dir
297 if os.path.isfile(core_path):
298 wrapped_testcase_suite.logger.error(
299 "Core-file exists in test temporary directory: %s!"
301 check_core_path(wrapped_testcase_suite.logger,
303 wrapped_testcase_suite.logger.debug(
304 "Running `file %s':" % core_path)
306 info = check_output(["file", core_path])
307 wrapped_testcase_suite.logger.debug(info)
308 except CalledProcessError as e:
309 wrapped_testcase_suite.logger.error(
310 "Could not run `file' utility on core-file, "
311 "rc=%s" % e.returncode)
315 wrapped_testcase_suite.last_test_vpp_binary,
316 core_path, wrapped_testcase_suite.logger)
317 wrapped_testcase_suite.child.terminate()
319 # terminating the child process tends to leave orphan
321 os.kill(wrapped_testcase_suite.vpp_pid, signal.SIGTERM)
325 results.append(wrapped_testcase_suite.result)
326 finished_testcase_suites.add(wrapped_testcase_suite)
328 for finished_testcase in finished_testcase_suites:
329 finished_testcase.child.join()
330 finished_testcase.close_pipes()
331 wrapped_testcase_suites.remove(finished_testcase)
332 finished_unread_testcases.add(finished_testcase)
333 finished_testcase.stdouterr_queue.put(None)
334 if len(testcase_suites) > 0:
335 new_testcase = TestCaseWrapper(testcase_suites.pop(0), manager)
336 wrapped_testcase_suites.add(new_testcase)
337 unread_testcases.add(new_testcase)
339 read_from_testcases.clear()
340 stdouterr_thread.join(test_timeout)
345 class SplitToSuitesCallback:
346 def __init__(self, filter_callback):
348 self.suite_name = 'default'
349 self.filter_callback = filter_callback
350 self.filtered = unittest.TestSuite()
352 def __call__(self, file_name, cls, method):
353 test_method = cls(method)
354 if self.filter_callback(file_name, cls.__name__, method):
355 self.suite_name = file_name + cls.__name__
356 if self.suite_name not in self.suites:
357 self.suites[self.suite_name] = unittest.TestSuite()
358 self.suites[self.suite_name].addTest(test_method)
361 self.filtered.addTest(test_method)
367 def parse_test_option():
368 f = os.getenv(test_option, None)
369 filter_file_name = None
370 filter_class_name = None
371 filter_func_name = None
376 raise Exception("Unrecognized %s option: %s" %
379 if parts[2] not in ('*', ''):
380 filter_func_name = parts[2]
381 if parts[1] not in ('*', ''):
382 filter_class_name = parts[1]
383 if parts[0] not in ('*', ''):
384 if parts[0].startswith('test_'):
385 filter_file_name = parts[0]
387 filter_file_name = 'test_%s' % parts[0]
389 if f.startswith('test_'):
392 filter_file_name = 'test_%s' % f
394 filter_file_name = '%s.py' % filter_file_name
395 return filter_file_name, filter_class_name, filter_func_name
398 def filter_tests(tests, filter_cb):
399 result = unittest.suite.TestSuite()
401 if isinstance(t, unittest.suite.TestSuite):
402 # this is a bunch of tests, recursively filter...
403 x = filter_tests(t, filter_cb)
404 if x.countTestCases() > 0:
406 elif isinstance(t, unittest.TestCase):
407 # this is a single test
408 parts = t.id().split('.')
409 # t.id() for common cases like this:
410 # test_classifier.TestClassifier.test_acl_ip
411 # apply filtering only if it is so
413 if not filter_cb(parts[0], parts[1], parts[2]):
417 # unexpected object, don't touch it
422 class FilterByTestOption:
423 def __init__(self, filter_file_name, filter_class_name, filter_func_name):
424 self.filter_file_name = filter_file_name
425 self.filter_class_name = filter_class_name
426 self.filter_func_name = filter_func_name
428 def __call__(self, file_name, class_name, func_name):
429 if self.filter_file_name:
430 fn_match = fnmatch.fnmatch(file_name, self.filter_file_name)
433 if self.filter_class_name and class_name != self.filter_class_name:
435 if self.filter_func_name and func_name != self.filter_func_name:
440 class FilterByClassList:
441 def __init__(self, classes_with_filenames):
442 self.classes_with_filenames = classes_with_filenames
444 def __call__(self, file_name, class_name, func_name):
445 return '.'.join([file_name, class_name]) in self.classes_with_filenames
448 def suite_from_failed(suite, failed):
449 failed = {x.rsplit('.', 1)[0] for x in failed}
450 filter_cb = FilterByClassList(failed)
451 suite = filter_tests(suite, filter_cb)
455 class AllResults(dict):
457 super(AllResults, self).__init__()
458 self.all_testcases = 0
459 self.results_per_suite = []
466 self.testsuites_no_tests_run = []
468 def add_results(self, result):
469 self.results_per_suite.append(result)
470 result_types = [PASS, FAIL, ERROR, SKIP, TEST_RUN]
471 for result_type in result_types:
472 self[result_type] += len(result[result_type])
474 def add_result(self, result):
476 self.all_testcases += result.testcase_suite.countTestCases()
477 if not result.no_tests_run():
478 if not result.was_successful():
481 self.add_results(result)
483 self.testsuites_no_tests_run.append(result.testcase_suite)
487 if concurrent_tests == 1:
488 if not result.no_tests_run():
489 self.rerun.append(result.suite_from_failed())
491 self.rerun.append(result.testcase_suite)
493 self.rerun.append(result.testcase_suite)
497 def print_results(self):
499 print(double_line_delim)
500 print('TEST RESULTS:')
501 print(' Scheduled tests: {}'.format(self.all_testcases))
502 print(' Executed tests: {}'.format(self[TEST_RUN]))
503 print(' Passed tests: {}'.format(
504 colorize(str(self[PASS]), GREEN)))
506 print(' Skipped tests: {}'.format(
507 colorize(str(self[SKIP]), YELLOW)))
508 if self.not_executed > 0:
509 print(' Not Executed tests: {}'.format(
510 colorize(str(self.not_executed), RED)))
512 print(' Failures: {}'.format(
513 colorize(str(self[FAIL]), RED)))
515 print(' Errors: {}'.format(
516 colorize(str(self[ERROR]), RED)))
518 if self.all_failed > 0:
519 print('FAILED TESTS:')
520 for result in self.results_per_suite:
521 failed_testcase_ids = result[FAIL]
522 errored_testcase_ids = result[ERROR]
523 old_testcase_name = None
524 if len(failed_testcase_ids) or len(errored_testcase_ids):
525 for failed_test_id in failed_testcase_ids:
526 new_testcase_name, test_name = \
527 result.get_testcase_names(failed_test_id)
528 if new_testcase_name != old_testcase_name:
529 print(' Testcase name: {}'.format(
530 colorize(new_testcase_name, RED)))
531 old_testcase_name = new_testcase_name
532 print(' FAILED: {}'.format(
533 colorize(test_name, RED)))
534 for failed_test_id in errored_testcase_ids:
535 new_testcase_name, test_name = \
536 result.get_testcase_names(failed_test_id)
537 if new_testcase_name != old_testcase_name:
538 print(' Testcase name: {}'.format(
539 colorize(new_testcase_name, RED)))
540 old_testcase_name = new_testcase_name
541 print(' ERRORED: {}'.format(
542 colorize(test_name, RED)))
543 if len(self.testsuites_no_tests_run) > 0:
544 print('TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:')
546 for testsuite in self.testsuites_no_tests_run:
547 for testcase in testsuite:
548 tc_classes.add(get_testcase_doc_name(testcase))
549 for tc_class in tc_classes:
550 print(' {}'.format(colorize(tc_class, RED)))
552 print(double_line_delim)
556 def not_executed(self):
557 return self.all_testcases - self[TEST_RUN]
560 def all_failed(self):
561 return self[FAIL] + self[ERROR]
564 def parse_results(results):
566 Prints the number of scheduled, executed, not executed, passed, failed,
567 errored and skipped tests and details about failed and errored tests.
569 Also returns all suites where any test failed.
575 results_per_suite = AllResults()
578 for result in results:
579 result_code = results_per_suite.add_result(result)
582 elif result_code == -1:
585 results_per_suite.print_results()
593 return return_code, results_per_suite.rerun
596 def parse_digit_env(env_var, default):
597 value = os.getenv(env_var, default)
602 print('WARNING: unsupported value "%s" for env var "%s",'
603 'defaulting to %s' % (value, env_var, default))
608 if __name__ == '__main__':
610 verbose = parse_digit_env("V", 0)
612 test_timeout = parse_digit_env("TIMEOUT", 600) # default = 10 minutes
614 retries = parse_digit_env("RETRIES", 0)
616 debug = os.getenv("DEBUG", "n").lower() in ["gdb", "gdbserver"]
618 step = os.getenv("STEP", "n").lower() in ("y", "yes", "1")
621 os.getenv("FORCE_FOREGROUND", "n").lower() in ("y", "yes", "1")
623 run_interactive = debug or step or force_foreground
625 test_jobs = os.getenv("TEST_JOBS", "1").lower() # default = 1 process
626 if test_jobs == 'auto':
629 print('Interactive mode required, running on one core')
631 shm_free = psutil.disk_usage('/dev/shm').free
632 shm_max_processes = 1
633 if shm_free < min_req_shm:
634 raise Exception('Not enough free space in /dev/shm. Required '
635 'free space is at least %sM.'
636 % (min_req_shm >> 20))
638 extra_shm = shm_free - min_req_shm
639 shm_max_processes += extra_shm / shm_per_process
640 concurrent_tests = min(cpu_count(), shm_max_processes)
641 print('Found enough resources to run tests with %s cores'
643 elif test_jobs.isdigit():
644 concurrent_tests = int(test_jobs)
648 if run_interactive and concurrent_tests > 1:
649 raise NotImplementedError(
650 'Running tests interactively (DEBUG, STEP or FORCE_FOREGROUND is '
651 'set) in parallel (TEST_JOBS is more than 1) is not '
654 parser = argparse.ArgumentParser(description="VPP unit tests")
655 parser.add_argument("-f", "--failfast", action='store_true',
656 help="fast failure flag")
657 parser.add_argument("-d", "--dir", action='append', type=str,
658 help="directory containing test files "
659 "(may be specified multiple times)")
660 args = parser.parse_args()
661 failfast = args.failfast
664 print("Running tests using custom test runner") # debug message
665 filter_file, filter_class, filter_func = parse_test_option()
667 print("Active filters: file=%s, class=%s, function=%s" % (
668 filter_file, filter_class, filter_func))
670 filter_cb = FilterByTestOption(filter_file, filter_class, filter_func)
672 cb = SplitToSuitesCallback(filter_cb)
674 print("Adding tests from directory tree %s" % d)
675 discover_tests(d, cb)
677 # suites are not hashable, need to use list
680 for testcase_suite in cb.suites.values():
681 tests_amount += testcase_suite.countTestCases()
682 suites.append(testcase_suite)
684 if concurrent_tests == 1:
685 new_suite = unittest.TestSuite()
687 new_suite.addTests(suite)
691 print("%s out of %s tests match specified filters" % (
692 tests_amount, tests_amount + cb.filtered.countTestCases()))
694 if not running_extended_tests():
695 print("Not running extended tests (some tests will be skipped)")
697 attempts = retries + 1
699 print("Perform %s attempts to pass the suite..." % attempts)
702 # don't fork if requiring interactive terminal
703 sys.exit(not VppTestRunner(
704 verbosity=verbose, failfast=failfast)
705 .run(suites[0]).wasSuccessful())
708 while len(suites) > 0 and attempts > 0:
709 tests_amount = sum([x.countTestCases() for x in suites])
710 results = run_forked(suites)
711 exit_code, suites = parse_results(results)
714 print('Test run was successful')
716 print('%s attempt(s) left.' % attempts)