13 from multiprocessing import Process, Pipe, cpu_count
14 from multiprocessing.queues import Queue
15 from multiprocessing.managers import BaseManager
16 from framework import VppTestRunner, running_extended_tests, VppTestCase, \
17 get_testcase_doc_name, get_test_description
18 from debug import spawn_gdb
19 from log import get_parallel_logger, double_line_delim, RED, YELLOW, GREEN, \
21 from discover_tests import discover_tests
22 from subprocess import check_output, CalledProcessError
23 from util import check_core_path
25 # timeout which controls how long the child has to finish after seeing
26 # a core dump in test temporary directory. If this is exceeded, parent assumes
27 # that child process is stuck (e.g. waiting for shm mutex, which will never
28 # get unlocked) and kill the child
30 min_req_shm = 536870912 # min 512MB shm required
31 # 128MB per extra process
32 shm_per_process = 134217728
35 class StreamQueue(Queue):
40 sys.__stdout__.flush()
41 sys.__stderr__.flush()
44 return self._writer.fileno()
47 class StreamQueueManager(BaseManager):
51 StreamQueueManager.register('Queue', StreamQueue)
54 def test_runner_wrapper(suite, keep_alive_pipe, result_pipe, stdouterr_queue,
56 sys.stdout = stdouterr_queue
57 sys.stderr = stdouterr_queue
58 VppTestCase.logger = logger
59 result = VppTestRunner(keep_alive_pipe=keep_alive_pipe,
60 descriptions=descriptions,
62 failfast=failfast).run(suite)
63 result_pipe.send(result)
65 keep_alive_pipe.close()
68 class TestCaseWrapper(object):
69 def __init__(self, testcase_suite, manager):
70 self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(
72 self.result_parent_end, self.result_child_end = Pipe(duplex=False)
73 self.testcase_suite = testcase_suite
74 self.stdouterr_queue = manager.Queue()
75 self.logger = get_parallel_logger(self.stdouterr_queue)
76 self.child = Process(target=test_runner_wrapper,
77 args=(testcase_suite, self.keep_alive_child_end,
78 self.result_child_end, self.stdouterr_queue,
82 self.pid = self.child.pid
83 self.last_test_temp_dir = None
84 self.last_test_vpp_binary = None
87 self.last_heard = time.time()
88 self.core_detected_at = None
89 self.failed_tests = []
91 def close_pipes(self):
92 self.keep_alive_child_end.close()
93 self.result_child_end.close()
94 self.keep_alive_parent_end.close()
95 self.result_parent_end.close()
98 def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases,
101 while read_testcases.is_set() or len(unread_testcases) > 0:
102 if not read_testcase:
103 if len(finished_unread_testcases) > 0:
104 read_testcase = finished_unread_testcases.pop()
105 unread_testcases.remove(read_testcase)
106 elif len(unread_testcases) > 0:
107 read_testcase = unread_testcases.pop()
110 while data is not None:
111 sys.stdout.write(data)
112 data = read_testcase.stdouterr_queue.get()
114 read_testcase.stdouterr_queue.close()
115 finished_unread_testcases.discard(read_testcase)
119 def run_forked(testcases):
120 wrapped_testcase_suites = set()
122 # suites are unhashable, need to use list
124 debug_core = os.getenv("DEBUG", "").lower() == "core"
125 unread_testcases = set()
126 finished_unread_testcases = set()
127 manager = StreamQueueManager()
129 for i in range(concurrent_tests):
130 if len(testcases) > 0:
131 wrapped_testcase_suite = TestCaseWrapper(testcases.pop(0), manager)
132 wrapped_testcase_suites.add(wrapped_testcase_suite)
133 unread_testcases.add(wrapped_testcase_suite)
138 read_from_testcases = threading.Event()
139 read_from_testcases.set()
140 stdouterr_thread = threading.Thread(target=stdouterr_reader_wrapper,
141 args=(unread_testcases,
142 finished_unread_testcases,
143 read_from_testcases))
144 stdouterr_thread.start()
146 while len(wrapped_testcase_suites) > 0:
147 finished_testcase_suites = set()
148 for wrapped_testcase_suite in wrapped_testcase_suites:
149 readable = select.select(
150 [wrapped_testcase_suite.keep_alive_parent_end.fileno(),
151 wrapped_testcase_suite.result_parent_end.fileno()],
153 if wrapped_testcase_suite.result_parent_end.fileno() in readable:
155 (wrapped_testcase_suite.testcase_suite,
156 wrapped_testcase_suite.result_parent_end.recv()))
157 finished_testcase_suites.add(wrapped_testcase_suite)
160 if wrapped_testcase_suite.keep_alive_parent_end.fileno() \
162 while wrapped_testcase_suite.keep_alive_parent_end.poll():
163 wrapped_testcase_suite.last_test, \
164 wrapped_testcase_suite.last_test_vpp_binary, \
165 wrapped_testcase_suite.last_test_temp_dir, \
166 wrapped_testcase_suite.vpp_pid = \
167 wrapped_testcase_suite.keep_alive_parent_end.recv()
168 wrapped_testcase_suite.last_heard = time.time()
171 if wrapped_testcase_suite.last_heard + test_timeout < time.time() \
172 and not os.path.isfile(
174 wrapped_testcase_suite.last_test_temp_dir):
176 wrapped_testcase_suite.logger.critical(
177 "Timeout while waiting for child test "
178 "runner process (last test running was "
180 (wrapped_testcase_suite.last_test,
181 wrapped_testcase_suite.last_test_temp_dir))
182 elif not wrapped_testcase_suite.child.is_alive():
184 wrapped_testcase_suite.logger.critical(
185 "Child python process unexpectedly died "
186 "(last test running was `%s' in `%s')!" %
187 (wrapped_testcase_suite.last_test,
188 wrapped_testcase_suite.last_test_temp_dir))
189 elif wrapped_testcase_suite.last_test_temp_dir and \
190 wrapped_testcase_suite.last_test_vpp_binary:
191 core_path = "%s/core" % \
192 wrapped_testcase_suite.last_test_temp_dir
193 if os.path.isfile(core_path):
194 if wrapped_testcase_suite.core_detected_at is None:
195 wrapped_testcase_suite.core_detected_at = time.time()
196 elif wrapped_testcase_suite.core_detected_at + \
197 core_timeout < time.time():
198 if not os.path.isfile(
200 wrapped_testcase_suite.
202 wrapped_testcase_suite.logger.critical(
203 "Child python process unresponsive and core-"
204 "file exists in test temporary directory!")
208 failed_dir = os.getenv('VPP_TEST_FAILED_DIR')
209 lttd = os.path.basename(
210 wrapped_testcase_suite.last_test_temp_dir)
211 link_path = '%s%s-FAILED' % (failed_dir, lttd)
212 wrapped_testcase_suite.logger.error(
213 "Creating a link to the failed test: %s -> %s" %
215 if not os.path.exists(link_path):
216 os.symlink(wrapped_testcase_suite.last_test_temp_dir,
218 api_post_mortem_path = "/tmp/api_post_mortem.%d" % \
219 wrapped_testcase_suite.vpp_pid
220 if os.path.isfile(api_post_mortem_path):
221 wrapped_testcase_suite.logger.error(
222 "Copying api_post_mortem.%d to %s" %
223 (wrapped_testcase_suite.vpp_pid,
224 wrapped_testcase_suite.last_test_temp_dir))
225 shutil.copy2(api_post_mortem_path,
226 wrapped_testcase_suite.last_test_temp_dir)
227 if wrapped_testcase_suite.last_test_temp_dir and \
228 wrapped_testcase_suite.last_test_vpp_binary:
229 core_path = "%s/core" % \
230 wrapped_testcase_suite.last_test_temp_dir
231 if os.path.isfile(core_path):
232 wrapped_testcase_suite.logger.error(
233 "Core-file exists in test temporary directory: %s!"
235 check_core_path(wrapped_testcase_suite.logger,
237 wrapped_testcase_suite.logger.debug(
238 "Running `file %s':" % core_path)
240 info = check_output(["file", core_path])
241 wrapped_testcase_suite.logger.debug(info)
242 except CalledProcessError as e:
243 wrapped_testcase_suite.logger.error(
244 "Could not run `file' utility on core-file, "
245 "rc=%s" % e.returncode)
249 wrapped_testcase_suite.last_test_vpp_binary,
250 core_path, wrapped_testcase_suite.logger)
251 wrapped_testcase_suite.child.terminate()
253 # terminating the child process tends to leave orphan
255 os.kill(wrapped_testcase_suite.vpp_pid, signal.SIGTERM)
259 results.append((wrapped_testcase_suite.testcase_suite, None))
260 finished_testcase_suites.add(wrapped_testcase_suite)
262 for finished_testcase in finished_testcase_suites:
263 finished_testcase.child.join()
264 finished_testcase.close_pipes()
265 wrapped_testcase_suites.remove(finished_testcase)
266 finished_unread_testcases.add(finished_testcase)
267 finished_testcase.stdouterr_queue.put(None)
268 if len(testcases) > 0:
269 new_testcase = TestCaseWrapper(testcases.pop(0), manager)
270 wrapped_testcase_suites.add(new_testcase)
271 unread_testcases.add(new_testcase)
273 read_from_testcases.clear()
274 stdouterr_thread.join(test_timeout)
279 class SplitToSuitesCallback:
280 def __init__(self, filter_callback):
282 self.suite_name = 'default'
283 self.filter_callback = filter_callback
284 self.filtered = unittest.TestSuite()
286 def __call__(self, file_name, cls, method):
287 test_method = cls(method)
288 if self.filter_callback(file_name, cls.__name__, method):
289 self.suite_name = file_name + cls.__name__
290 if self.suite_name not in self.suites:
291 self.suites[self.suite_name] = unittest.TestSuite()
292 self.suites[self.suite_name].addTest(test_method)
295 self.filtered.addTest(test_method)
301 def parse_test_option():
302 f = os.getenv(test_option, None)
303 filter_file_name = None
304 filter_class_name = None
305 filter_func_name = None
310 raise Exception("Unrecognized %s option: %s" %
313 if parts[2] not in ('*', ''):
314 filter_func_name = parts[2]
315 if parts[1] not in ('*', ''):
316 filter_class_name = parts[1]
317 if parts[0] not in ('*', ''):
318 if parts[0].startswith('test_'):
319 filter_file_name = parts[0]
321 filter_file_name = 'test_%s' % parts[0]
323 if f.startswith('test_'):
326 filter_file_name = 'test_%s' % f
328 filter_file_name = '%s.py' % filter_file_name
329 return filter_file_name, filter_class_name, filter_func_name
332 def filter_tests(tests, filter_cb):
333 result = unittest.suite.TestSuite()
335 if isinstance(t, unittest.suite.TestSuite):
336 # this is a bunch of tests, recursively filter...
337 x = filter_tests(t, filter_cb)
338 if x.countTestCases() > 0:
340 elif isinstance(t, unittest.TestCase):
341 # this is a single test
342 parts = t.id().split('.')
343 # t.id() for common cases like this:
344 # test_classifier.TestClassifier.test_acl_ip
345 # apply filtering only if it is so
347 if not filter_cb(parts[0], parts[1], parts[2]):
351 # unexpected object, don't touch it
356 class FilterByTestOption:
357 def __init__(self, filter_file_name, filter_class_name, filter_func_name):
358 self.filter_file_name = filter_file_name
359 self.filter_class_name = filter_class_name
360 self.filter_func_name = filter_func_name
362 def __call__(self, file_name, class_name, func_name):
363 if self.filter_file_name and file_name != self.filter_file_name:
365 if self.filter_class_name and class_name != self.filter_class_name:
367 if self.filter_func_name and func_name != self.filter_func_name:
372 class FilterByClassList:
373 def __init__(self, class_list):
374 self.class_list = class_list
376 def __call__(self, file_name, class_name, func_name):
377 return class_name in self.class_list
380 def suite_from_failed(suite, failed):
381 filter_cb = FilterByClassList(failed)
382 suite = filter_tests(suite, filter_cb)
386 class NonPassedResults(dict):
388 super(NonPassedResults, self).__init__()
389 self.all_testcases = 0
390 self.results_per_suite = {}
391 self.failures_id = 'failures'
392 self.errors_id = 'errors'
393 self.crashes_id = 'crashes'
394 self.skipped_id = 'skipped'
395 self.expectedFailures_id = 'expectedFailures'
396 self.unexpectedSuccesses_id = 'unexpectedSuccesses'
398 self[self.failures_id] = 0
399 self[self.errors_id] = 0
400 self[self.crashes_id] = 0
401 self[self.skipped_id] = 0
402 self[self.expectedFailures_id] = 0
403 self[self.unexpectedSuccesses_id] = 0
405 def _add_result(self, test, result_id):
406 if isinstance(test, VppTestCase):
407 parts = test.id().split('.')
409 tc_class = get_testcase_doc_name(test)
410 if tc_class not in self.results_per_suite:
411 # failed, errored, skipped, expectedly failed,
412 # unexpectedly passed
413 self.results_per_suite[tc_class] = \
414 {self.failures_id: [],
418 self.expectedFailures_id: [],
419 self.unexpectedSuccesses_id: []}
420 self.results_per_suite[tc_class][result_id].append(test)
424 def add_results(self, testcases, testcase_result,
426 for failed_testcase, _ in testcases:
427 if self._add_result(failed_testcase, testcase_result):
429 if failed_testcase not in duplicates:
430 self[testcase_result] += 1
432 self[testcase_result] += 1
434 def add_result(self, testcase_suite, result):
436 self.all_testcases += testcase_suite.countTestCases()
438 # suite finished properly
439 if not result.wasSuccessful():
442 self.add_results(result.failures, self.failures_id)
443 self.add_results(result.errors, self.errors_id,
444 result.failures + result.errors)
445 self.add_results(result.skipped, self.skipped_id)
446 self.add_results(result.expectedFailures,
447 self.expectedFailures_id)
448 self.add_results(result.unexpectedSuccesses,
449 self.unexpectedSuccesses_id)
454 self.add_results([(x, None) for x in testcase_suite],
458 if concurrent_tests == 1:
460 rerun_classes = {x[0].__class__.__name__ for
462 rerun_classes.update({x[0].__class__.__name__ for
463 x in result.failures})
464 self.rerun.append(suite_from_failed(testcase_suite,
467 self.rerun.append(testcase_suite)
469 self.rerun.append(testcase_suite)
473 def print_results(self):
475 print(double_line_delim)
476 print('TEST RESULTS:')
477 print(' Executed tests: {}'.format(self.all_testcases))
478 print(' Passed tests: {}'.format(
479 colorize(str(self.all_testcases -
480 self.all_nonpassed), GREEN)))
481 if self[self.failures_id] > 0:
482 print(' Failed tests: {}'.format(
483 colorize(str(self[self.failures_id]), RED)))
484 if self[self.errors_id] > 0:
485 print(' Errored tests: {}'.format(
486 colorize(str(self[self.errors_id]), RED)))
487 if self[self.crashes_id] > 0:
488 print(' Crashed tests: {}'.format(
489 colorize(str(self[self.crashes_id]), RED)))
490 if self[self.skipped_id] > 0:
491 print(' Skipped tests: {}'.format(
492 colorize(str(self[self.skipped_id]), YELLOW)))
493 if self[self.expectedFailures_id] > 0:
494 print(' Expected failures: {}'.format(
495 colorize(str(self[self.expectedFailures_id]), GREEN)))
496 if self[self.unexpectedSuccesses_id] > 0:
497 print(' Unexpected successes: {}'.format(
498 colorize(str(self[self.unexpectedSuccesses_id]), YELLOW)))
500 if self.all_failed > 0:
501 print('FAILED TESTS:')
502 for testcase_class, suite_results in \
503 self.results_per_suite.items():
504 failed_testcases = suite_results[
506 errored_testcases = suite_results[
508 crashed_testcases = suite_results[
510 if len(failed_testcases) or len(errored_testcases) \
511 or len(crashed_testcases):
512 print(' Testcase name: {}'.format(
513 colorize(testcase_class, RED)))
514 for failed_test in failed_testcases:
515 print(' FAILED: {}'.format(
516 colorize(get_test_description(
517 descriptions, failed_test), RED)))
518 for failed_test in errored_testcases:
519 print(' ERRORED: {}'.format(
520 colorize(get_test_description(
521 descriptions, failed_test), RED)))
522 for failed_test in crashed_testcases:
523 print(' CRASHED: {}'.format(
524 colorize(get_test_description(
525 descriptions, failed_test), RED)))
527 print(double_line_delim)
531 def all_nonpassed(self):
532 return self[self.failures_id] + self[self.errors_id] + \
533 self[self.crashes_id] + self[self.skipped_id] + \
534 self[self.expectedFailures_id] + \
535 self[self.unexpectedSuccesses_id]
538 def all_failed(self):
539 return self[self.failures_id] + self[self.errors_id] + \
540 self[self.crashes_id]
543 def parse_results(results):
545 Prints the number of executed, passed, failed, errored, skipped,
546 expectedly failed and unexpectedly passed tests and details about
547 failed, errored, expectedly failed and unexpectedly passed tests.
549 Also returns any suites where any test failed.
555 results_per_suite = NonPassedResults()
558 for testcase_suite, result in results:
559 result_code = results_per_suite.add_result(testcase_suite, result)
562 elif result_code == -1:
565 results_per_suite.print_results()
573 return return_code, results_per_suite.rerun
576 def parse_digit_env(env_var, default):
577 value = os.getenv(env_var, default)
582 print('WARNING: unsupported value "%s" for env var "%s",'
583 'defaulting to %s' % (value, env_var, default))
588 if __name__ == '__main__':
590 verbose = parse_digit_env("V", 0)
592 test_timeout = parse_digit_env("TIMEOUT", 600) # default = 10 minutes
594 retries = parse_digit_env("RETRIES", 0)
596 debug = os.getenv("DEBUG", "n").lower() in ["gdb", "gdbserver"]
598 step = os.getenv("STEP", "n").lower() in ("y", "yes", "1")
601 os.getenv("FORCE_FOREGROUND", "n").lower() in ("y", "yes", "1")
603 run_interactive = debug or step or force_foreground
605 test_jobs = os.getenv("TEST_JOBS", "1").lower() # default = 1 process
606 if test_jobs == 'auto':
609 print('Interactive mode required, running on one core')
611 shm_free = psutil.disk_usage('/dev/shm').free
612 shm_max_processes = 1
613 if shm_free < min_req_shm:
614 raise Exception('Not enough free space in /dev/shm. Required '
615 'free space is at least %sM.'
616 % (min_req_shm >> 20))
618 extra_shm = shm_free - min_req_shm
619 shm_max_processes += extra_shm / shm_per_process
620 concurrent_tests = max(cpu_count(), shm_max_processes)
621 print('Found enough resources to run tests with %s cores'
623 elif test_jobs.isdigit():
624 concurrent_tests = int(test_jobs)
628 if run_interactive and concurrent_tests > 1:
629 raise NotImplementedError(
630 'Running tests interactively (DEBUG, STEP or FORCE_FOREGROUND is '
631 'set) in parallel (TEST_JOBS is more than 1) is not '
634 parser = argparse.ArgumentParser(description="VPP unit tests")
635 parser.add_argument("-f", "--failfast", action='store_true',
636 help="fast failure flag")
637 parser.add_argument("-d", "--dir", action='append', type=str,
638 help="directory containing test files "
639 "(may be specified multiple times)")
640 args = parser.parse_args()
641 failfast = args.failfast
644 print("Running tests using custom test runner") # debug message
645 filter_file, filter_class, filter_func = parse_test_option()
647 print("Active filters: file=%s, class=%s, function=%s" % (
648 filter_file, filter_class, filter_func))
650 filter_cb = FilterByTestOption(filter_file, filter_class, filter_func)
652 cb = SplitToSuitesCallback(filter_cb)
654 print("Adding tests from directory tree %s" % d)
655 discover_tests(d, cb)
657 # suites are not hashable, need to use list
660 for testcase_suite in cb.suites.values():
661 tests_amount += testcase_suite.countTestCases()
662 suites.append(testcase_suite)
664 if concurrent_tests == 1:
665 new_suite = unittest.TestSuite()
667 new_suite.addTest(suite)
671 print("%s out of %s tests match specified filters" % (
672 tests_amount, tests_amount + cb.filtered.countTestCases()))
674 if not running_extended_tests():
675 print("Not running extended tests (some tests will be skipped)")
677 attempts = retries + 1
679 print("Perform %s attempts to pass the suite..." % attempts)
682 # don't fork if requiring interactive terminal
683 sys.exit(not VppTestRunner(
684 verbosity=verbose, failfast=failfast)
685 .run(suites[0]).wasSuccessful())
688 while len(suites) > 0 and attempts > 0:
689 tests_amount = sum([x.countTestCases() for x in suites])
690 results = run_forked(suites)
691 exit_code, suites = parse_results(results)
694 print('Test run was successful')
696 print('%s attempt(s) left.' % attempts)