14 import multiprocessing
15 from multiprocessing import Process, Pipe, cpu_count
16 from multiprocessing.queues import Queue
17 from multiprocessing.managers import BaseManager
19 from framework import VppTestRunner, running_extended_tests, VppTestCase, \
20 get_testcase_doc_name, get_test_description, PASS, FAIL, ERROR, SKIP, \
22 from debug import spawn_gdb
23 from log import get_parallel_logger, double_line_delim, RED, YELLOW, GREEN, \
24 colorize, single_line_delim
25 from discover_tests import discover_tests
26 from subprocess import check_output, CalledProcessError
27 from util import check_core_path, get_core_path, is_core_present
29 # timeout which controls how long the child has to finish after seeing
30 # a core dump in test temporary directory. If this is exceeded, parent assumes
31 # that child process is stuck (e.g. waiting for shm mutex, which will never
32 # get unlocked) and kill the child
34 min_req_shm = 536870912 # min 512MB shm required
35 # 128MB per extra process
36 shm_per_process = 134217728
39 class StreamQueue(Queue):
44 sys.__stdout__.flush()
45 sys.__stderr__.flush()
48 return self._writer.fileno()
51 class StreamQueueManager(BaseManager):
55 StreamQueueManager.register('StreamQueue', StreamQueue)
58 class TestResult(dict):
59 def __init__(self, testcase_suite, testcases_by_id=None):
60 super(TestResult, self).__init__()
67 self.testcase_suite = testcase_suite
68 self.testcases = [testcase for testcase in testcase_suite]
69 self.testcases_by_id = testcases_by_id
71 def was_successful(self):
72 return 0 == len(self[FAIL]) == len(self[ERROR]) \
73 and len(self[PASS] + self[SKIP]) \
74 == self.testcase_suite.countTestCases() == len(self[TEST_RUN])
76 def no_tests_run(self):
77 return 0 == len(self[TEST_RUN])
79 def process_result(self, test_id, result):
80 self[result].append(test_id)
82 def suite_from_failed(self):
84 for testcase in self.testcase_suite:
86 if tc_id not in self[PASS] and tc_id not in self[SKIP]:
89 return suite_from_failed(self.testcase_suite, rerun_ids)
91 def get_testcase_names(self, test_id):
92 # could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
93 setup_teardown_match = re.match(
94 r'((tearDownClass)|(setUpClass)) \((.+\..+)\)', test_id)
95 if setup_teardown_match:
96 test_name, _, _, testcase_name = setup_teardown_match.groups()
97 if len(testcase_name.split('.')) == 2:
98 for key in self.testcases_by_id.keys():
99 if key.startswith(testcase_name):
102 testcase_name = self._get_testcase_doc_name(testcase_name)
104 test_name = self._get_test_description(test_id)
105 testcase_name = self._get_testcase_doc_name(test_id)
107 return testcase_name, test_name
109 def _get_test_description(self, test_id):
110 if test_id in self.testcases_by_id:
111 desc = get_test_description(descriptions,
112 self.testcases_by_id[test_id])
117 def _get_testcase_doc_name(self, test_id):
118 if test_id in self.testcases_by_id:
119 doc_name = get_testcase_doc_name(self.testcases_by_id[test_id])
125 def test_runner_wrapper(suite, keep_alive_pipe, stdouterr_queue,
126 finished_pipe, result_pipe, logger):
127 sys.stdout = stdouterr_queue
128 sys.stderr = stdouterr_queue
129 VppTestCase.parallel_handler = logger.handlers[0]
130 result = VppTestRunner(keep_alive_pipe=keep_alive_pipe,
131 descriptions=descriptions,
133 result_pipe=result_pipe,
135 print_summary=False).run(suite)
136 finished_pipe.send(result.wasSuccessful())
137 finished_pipe.close()
138 keep_alive_pipe.close()
141 class TestCaseWrapper(object):
142 def __init__(self, testcase_suite, manager):
143 self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(
145 self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
146 self.result_parent_end, self.result_child_end = Pipe(duplex=False)
147 self.testcase_suite = testcase_suite
148 if sys.version[0] == '2':
149 self.stdouterr_queue = manager.StreamQueue()
151 from multiprocessing import get_context
152 self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
153 self.logger = get_parallel_logger(self.stdouterr_queue)
154 self.child = Process(target=test_runner_wrapper,
155 args=(testcase_suite,
156 self.keep_alive_child_end,
157 self.stdouterr_queue,
158 self.finished_child_end,
159 self.result_child_end,
163 self.last_test_temp_dir = None
164 self.last_test_vpp_binary = None
165 self._last_test = None
166 self.last_test_id = None
168 self.last_heard = time.time()
169 self.core_detected_at = None
170 self.testcases_by_id = {}
171 self.testclasess_with_core = {}
172 for testcase in self.testcase_suite:
173 self.testcases_by_id[testcase.id()] = testcase
174 self.result = TestResult(testcase_suite, self.testcases_by_id)
178 return self._last_test
181 def last_test(self, test_id):
182 self.last_test_id = test_id
183 if test_id in self.testcases_by_id:
184 testcase = self.testcases_by_id[test_id]
185 self._last_test = testcase.shortDescription()
186 if not self._last_test:
187 self._last_test = str(testcase)
189 self._last_test = test_id
191 def add_testclass_with_core(self):
192 if self.last_test_id in self.testcases_by_id:
193 test = self.testcases_by_id[self.last_test_id]
194 class_name = unittest.util.strclass(test.__class__)
195 test_name = "'{}' ({})".format(get_test_description(descriptions,
199 test_name = self.last_test_id
200 class_name = re.match(r'((tearDownClass)|(setUpClass)) '
201 r'\((.+\..+)\)', test_name).groups()[3]
202 if class_name not in self.testclasess_with_core:
203 self.testclasess_with_core[class_name] = (
205 self.last_test_vpp_binary,
206 self.last_test_temp_dir)
208 def close_pipes(self):
209 self.keep_alive_child_end.close()
210 self.finished_child_end.close()
211 self.result_child_end.close()
212 self.keep_alive_parent_end.close()
213 self.finished_parent_end.close()
214 self.result_parent_end.close()
216 def was_successful(self):
217 return self.result.was_successful()
220 def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases,
223 while read_testcases.is_set() or unread_testcases:
224 if finished_unread_testcases:
225 read_testcase = finished_unread_testcases.pop()
226 unread_testcases.remove(read_testcase)
227 elif unread_testcases:
228 read_testcase = unread_testcases.pop()
231 while data is not None:
232 sys.stdout.write(data)
233 data = read_testcase.stdouterr_queue.get()
235 read_testcase.stdouterr_queue.close()
236 finished_unread_testcases.discard(read_testcase)
240 def handle_failed_suite(logger, last_test_temp_dir, vpp_pid):
241 if last_test_temp_dir:
242 # Need to create link in case of a timeout or core dump without failure
243 lttd = os.path.basename(last_test_temp_dir)
244 failed_dir = os.getenv('FAILED_DIR')
245 link_path = '%s%s-FAILED' % (failed_dir, lttd)
246 if not os.path.exists(link_path):
247 os.symlink(last_test_temp_dir, link_path)
248 logger.error("Symlink to failed testcase directory: %s -> %s"
251 # Report core existence
252 core_path = get_core_path(last_test_temp_dir)
253 if os.path.exists(core_path):
255 "Core-file exists in test temporary directory: %s!" %
257 check_core_path(logger, core_path)
258 logger.debug("Running 'file %s':" % core_path)
260 info = check_output(["file", core_path])
262 except CalledProcessError as e:
263 logger.error("Subprocess returned with return code "
264 "while running `file' utility on core-file "
266 "rc=%s", e.returncode)
268 logger.error("Subprocess returned with OS error while "
269 "running 'file' utility "
271 "(%s) %s", e.errno, e.strerror)
272 except Exception as e:
273 logger.exception("Unexpected error running `file' utility "
275 logger.error("gdb %s %s" %
276 (os.getenv('VPP_BIN', 'vpp'), core_path))
279 # Copy api post mortem
280 api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
281 if os.path.isfile(api_post_mortem_path):
282 logger.error("Copying api_post_mortem.%d to %s" %
283 (vpp_pid, last_test_temp_dir))
284 shutil.copy2(api_post_mortem_path, last_test_temp_dir)
287 def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
288 if is_core_present(tempdir):
290 print('VPP core detected in %s. Last test running was %s' %
291 (tempdir, core_crash_test))
292 print(single_line_delim)
293 spawn_gdb(vpp_binary, get_core_path(tempdir))
294 print(single_line_delim)
296 print("Compressing core-file in test directory `%s'" % tempdir)
297 os.system("gzip %s" % get_core_path(tempdir))
300 def handle_cores(failed_testcases):
301 for failed_testcase in failed_testcases:
302 tcs_with_core = failed_testcase.testclasess_with_core
304 for test, vpp_binary, tempdir in tcs_with_core.values():
305 check_and_handle_core(vpp_binary, tempdir, test)
308 def process_finished_testsuite(wrapped_testcase_suite,
309 finished_testcase_suites,
310 failed_wrapped_testcases,
312 results.append(wrapped_testcase_suite.result)
313 finished_testcase_suites.add(wrapped_testcase_suite)
315 if failfast and not wrapped_testcase_suite.was_successful():
318 if not wrapped_testcase_suite.was_successful():
319 failed_wrapped_testcases.add(wrapped_testcase_suite)
320 handle_failed_suite(wrapped_testcase_suite.logger,
321 wrapped_testcase_suite.last_test_temp_dir,
322 wrapped_testcase_suite.vpp_pid)
327 def run_forked(testcase_suites):
328 wrapped_testcase_suites = set()
329 solo_testcase_suites = []
330 total_test_runners = 0
332 # suites are unhashable, need to use list
334 unread_testcases = set()
335 finished_unread_testcases = set()
336 manager = StreamQueueManager()
338 total_test_runners = 0
339 while total_test_runners < concurrent_tests:
341 a_suite = testcase_suites.pop(0)
342 if a_suite.is_tagged_run_solo:
343 solo_testcase_suites.append(a_suite)
345 wrapped_testcase_suite = TestCaseWrapper(a_suite,
347 wrapped_testcase_suites.add(wrapped_testcase_suite)
348 unread_testcases.add(wrapped_testcase_suite)
349 total_test_runners = total_test_runners + 1
353 while total_test_runners < 1 and solo_testcase_suites:
354 if solo_testcase_suites:
355 a_suite = solo_testcase_suites.pop(0)
356 wrapped_testcase_suite = TestCaseWrapper(a_suite,
358 wrapped_testcase_suites.add(wrapped_testcase_suite)
359 unread_testcases.add(wrapped_testcase_suite)
360 total_test_runners = total_test_runners + 1
364 read_from_testcases = threading.Event()
365 read_from_testcases.set()
366 stdouterr_thread = threading.Thread(target=stdouterr_reader_wrapper,
367 args=(unread_testcases,
368 finished_unread_testcases,
369 read_from_testcases))
370 stdouterr_thread.start()
372 failed_wrapped_testcases = set()
376 while wrapped_testcase_suites:
377 finished_testcase_suites = set()
378 for wrapped_testcase_suite in wrapped_testcase_suites:
379 while wrapped_testcase_suite.result_parent_end.poll():
380 wrapped_testcase_suite.result.process_result(
381 *wrapped_testcase_suite.result_parent_end.recv())
382 wrapped_testcase_suite.last_heard = time.time()
384 while wrapped_testcase_suite.keep_alive_parent_end.poll():
385 wrapped_testcase_suite.last_test, \
386 wrapped_testcase_suite.last_test_vpp_binary, \
387 wrapped_testcase_suite.last_test_temp_dir, \
388 wrapped_testcase_suite.vpp_pid = \
389 wrapped_testcase_suite.keep_alive_parent_end.recv()
390 wrapped_testcase_suite.last_heard = time.time()
392 if wrapped_testcase_suite.finished_parent_end.poll():
393 wrapped_testcase_suite.finished_parent_end.recv()
394 wrapped_testcase_suite.last_heard = time.time()
395 stop_run = process_finished_testsuite(
396 wrapped_testcase_suite,
397 finished_testcase_suites,
398 failed_wrapped_testcases,
403 if wrapped_testcase_suite.last_heard + test_timeout < \
406 wrapped_testcase_suite.logger.critical(
407 "Child test runner process timed out "
408 "(last test running was `%s' in `%s')!" %
409 (wrapped_testcase_suite.last_test,
410 wrapped_testcase_suite.last_test_temp_dir))
411 elif not wrapped_testcase_suite.child.is_alive():
413 wrapped_testcase_suite.logger.critical(
414 "Child test runner process unexpectedly died "
415 "(last test running was `%s' in `%s')!" %
416 (wrapped_testcase_suite.last_test,
417 wrapped_testcase_suite.last_test_temp_dir))
418 elif wrapped_testcase_suite.last_test_temp_dir and \
419 wrapped_testcase_suite.last_test_vpp_binary:
421 wrapped_testcase_suite.last_test_temp_dir):
422 wrapped_testcase_suite.add_testclass_with_core()
423 if wrapped_testcase_suite.core_detected_at is None:
424 wrapped_testcase_suite.core_detected_at = \
426 elif wrapped_testcase_suite.core_detected_at + \
427 core_timeout < time.time():
428 wrapped_testcase_suite.logger.critical(
429 "Child test runner process unresponsive and "
430 "core-file exists in test temporary directory "
431 "(last test running was `%s' in `%s')!" %
432 (wrapped_testcase_suite.last_test,
433 wrapped_testcase_suite.last_test_temp_dir))
437 wrapped_testcase_suite.child.terminate()
439 # terminating the child process tends to leave orphan
441 if wrapped_testcase_suite.vpp_pid:
442 os.kill(wrapped_testcase_suite.vpp_pid,
447 wrapped_testcase_suite.result.crashed = True
448 wrapped_testcase_suite.result.process_result(
449 wrapped_testcase_suite.last_test_id, ERROR)
450 stop_run = process_finished_testsuite(
451 wrapped_testcase_suite,
452 finished_testcase_suites,
453 failed_wrapped_testcases,
456 for finished_testcase in finished_testcase_suites:
457 # Somewhat surprisingly, the join below may
458 # timeout, even if client signaled that
459 # it finished - so we note it just in case.
460 join_start = time.time()
461 finished_testcase.child.join(test_finished_join_timeout)
462 join_end = time.time()
463 if join_end - join_start >= test_finished_join_timeout:
464 finished_testcase.logger.error(
465 "Timeout joining finished test: %s (pid %d)" %
466 (finished_testcase.last_test,
467 finished_testcase.child.pid))
468 finished_testcase.close_pipes()
469 wrapped_testcase_suites.remove(finished_testcase)
470 finished_unread_testcases.add(finished_testcase)
471 finished_testcase.stdouterr_queue.put(None)
472 total_test_runners = total_test_runners - 1
474 while testcase_suites:
475 results.append(TestResult(testcase_suites.pop(0)))
476 elif testcase_suites:
477 a_testcase = testcase_suites.pop(0)
478 while a_testcase and a_testcase.is_tagged_run_solo:
479 solo_testcase_suites.append(a_testcase)
481 a_testcase = testcase_suites.pop(0)
485 new_testcase = TestCaseWrapper(a_testcase,
487 wrapped_testcase_suites.add(new_testcase)
488 total_test_runners = total_test_runners + 1
489 unread_testcases.add(new_testcase)
490 if solo_testcase_suites and total_test_runners == 0:
491 a_testcase = solo_testcase_suites.pop(0)
492 new_testcase = TestCaseWrapper(a_testcase,
494 wrapped_testcase_suites.add(new_testcase)
495 total_test_runners = total_test_runners + 1
496 unread_testcases.add(new_testcase)
499 for wrapped_testcase_suite in wrapped_testcase_suites:
500 wrapped_testcase_suite.child.terminate()
501 wrapped_testcase_suite.stdouterr_queue.put(None)
504 read_from_testcases.clear()
505 stdouterr_thread.join(test_timeout)
508 handle_cores(failed_wrapped_testcases)
512 class SplitToSuitesCallback:
513 def __init__(self, filter_callback):
515 self.suite_name = 'default'
516 self.filter_callback = filter_callback
517 self.filtered = unittest.TestSuite()
519 def __call__(self, file_name, cls, method):
520 test_method = cls(method)
521 if self.filter_callback(file_name, cls.__name__, method):
522 self.suite_name = file_name + cls.__name__
523 if self.suite_name not in self.suites:
524 self.suites[self.suite_name] = unittest.TestSuite()
525 self.suites[self.suite_name].is_tagged_run_solo = False
526 self.suites[self.suite_name].addTest(test_method)
527 if test_method.is_tagged_run_solo():
528 self.suites[self.suite_name].is_tagged_run_solo = True
531 self.filtered.addTest(test_method)
537 def parse_test_option():
538 f = os.getenv(test_option, None)
539 filter_file_name = None
540 filter_class_name = None
541 filter_func_name = None
546 raise Exception("Unrecognized %s option: %s" %
549 if parts[2] not in ('*', ''):
550 filter_func_name = parts[2]
551 if parts[1] not in ('*', ''):
552 filter_class_name = parts[1]
553 if parts[0] not in ('*', ''):
554 if parts[0].startswith('test_'):
555 filter_file_name = parts[0]
557 filter_file_name = 'test_%s' % parts[0]
559 if f.startswith('test_'):
562 filter_file_name = 'test_%s' % f
564 filter_file_name = '%s.py' % filter_file_name
565 return filter_file_name, filter_class_name, filter_func_name
568 def filter_tests(tests, filter_cb):
569 result = unittest.suite.TestSuite()
571 if isinstance(t, unittest.suite.TestSuite):
572 # this is a bunch of tests, recursively filter...
573 x = filter_tests(t, filter_cb)
574 if x.countTestCases() > 0:
576 elif isinstance(t, unittest.TestCase):
577 # this is a single test
578 parts = t.id().split('.')
579 # t.id() for common cases like this:
580 # test_classifier.TestClassifier.test_acl_ip
581 # apply filtering only if it is so
583 if not filter_cb(parts[0], parts[1], parts[2]):
587 # unexpected object, don't touch it
592 class FilterByTestOption:
593 def __init__(self, filter_file_name, filter_class_name, filter_func_name):
594 self.filter_file_name = filter_file_name
595 self.filter_class_name = filter_class_name
596 self.filter_func_name = filter_func_name
598 def __call__(self, file_name, class_name, func_name):
599 if self.filter_file_name:
600 fn_match = fnmatch.fnmatch(file_name, self.filter_file_name)
603 if self.filter_class_name and class_name != self.filter_class_name:
605 if self.filter_func_name and func_name != self.filter_func_name:
610 class FilterByClassList:
611 def __init__(self, classes_with_filenames):
612 self.classes_with_filenames = classes_with_filenames
614 def __call__(self, file_name, class_name, func_name):
615 return '.'.join([file_name, class_name]) in self.classes_with_filenames
618 def suite_from_failed(suite, failed):
619 failed = {x.rsplit('.', 1)[0] for x in failed}
620 filter_cb = FilterByClassList(failed)
621 suite = filter_tests(suite, filter_cb)
625 class AllResults(dict):
627 super(AllResults, self).__init__()
628 self.all_testcases = 0
629 self.results_per_suite = []
636 self.testsuites_no_tests_run = []
638 def add_results(self, result):
639 self.results_per_suite.append(result)
640 result_types = [PASS, FAIL, ERROR, SKIP, TEST_RUN]
641 for result_type in result_types:
642 self[result_type] += len(result[result_type])
644 def add_result(self, result):
646 self.all_testcases += result.testcase_suite.countTestCases()
647 self.add_results(result)
649 if result.no_tests_run():
650 self.testsuites_no_tests_run.append(result.testcase_suite)
655 elif not result.was_successful():
659 self.rerun.append(result.testcase_suite)
663 def print_results(self):
665 print(double_line_delim)
666 print('TEST RESULTS:')
667 print(' Scheduled tests: {}'.format(self.all_testcases))
668 print(' Executed tests: {}'.format(self[TEST_RUN]))
669 print(' Passed tests: {}'.format(
670 colorize(str(self[PASS]), GREEN)))
672 print(' Skipped tests: {}'.format(
673 colorize(str(self[SKIP]), YELLOW)))
674 if self.not_executed > 0:
675 print(' Not Executed tests: {}'.format(
676 colorize(str(self.not_executed), RED)))
678 print(' Failures: {}'.format(
679 colorize(str(self[FAIL]), RED)))
681 print(' Errors: {}'.format(
682 colorize(str(self[ERROR]), RED)))
684 if self.all_failed > 0:
685 print('FAILURES AND ERRORS IN TESTS:')
686 for result in self.results_per_suite:
687 failed_testcase_ids = result[FAIL]
688 errored_testcase_ids = result[ERROR]
689 old_testcase_name = None
690 if failed_testcase_ids:
691 for failed_test_id in failed_testcase_ids:
692 new_testcase_name, test_name = \
693 result.get_testcase_names(failed_test_id)
694 if new_testcase_name != old_testcase_name:
695 print(' Testcase name: {}'.format(
696 colorize(new_testcase_name, RED)))
697 old_testcase_name = new_testcase_name
698 print(' FAILURE: {} [{}]'.format(
699 colorize(test_name, RED), failed_test_id))
700 if errored_testcase_ids:
701 for errored_test_id in errored_testcase_ids:
702 new_testcase_name, test_name = \
703 result.get_testcase_names(errored_test_id)
704 if new_testcase_name != old_testcase_name:
705 print(' Testcase name: {}'.format(
706 colorize(new_testcase_name, RED)))
707 old_testcase_name = new_testcase_name
708 print(' ERROR: {} [{}]'.format(
709 colorize(test_name, RED), errored_test_id))
710 if self.testsuites_no_tests_run:
711 print('TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:')
713 for testsuite in self.testsuites_no_tests_run:
714 for testcase in testsuite:
715 tc_classes.add(get_testcase_doc_name(testcase))
716 for tc_class in tc_classes:
717 print(' {}'.format(colorize(tc_class, RED)))
719 print(double_line_delim)
723 def not_executed(self):
724 return self.all_testcases - self[TEST_RUN]
727 def all_failed(self):
728 return self[FAIL] + self[ERROR]
731 def parse_results(results):
733 Prints the number of scheduled, executed, not executed, passed, failed,
734 errored and skipped tests and details about failed and errored tests.
736 Also returns all suites where any test failed.
742 results_per_suite = AllResults()
745 for result in results:
746 result_code = results_per_suite.add_result(result)
749 elif result_code == -1:
752 results_per_suite.print_results()
760 return return_code, results_per_suite.rerun
763 def parse_digit_env(env_var, default):
764 value = os.getenv(env_var, default)
769 print('WARNING: unsupported value "%s" for env var "%s",'
770 'defaulting to %s' % (value, env_var, default))
775 if __name__ == '__main__':
777 verbose = parse_digit_env("V", 0)
779 test_timeout = parse_digit_env("TIMEOUT", 600) # default = 10 minutes
781 test_finished_join_timeout = 15
783 retries = parse_digit_env("RETRIES", 0)
785 debug = os.getenv("DEBUG", "n").lower() in ["gdb", "gdbserver"]
787 debug_core = os.getenv("DEBUG", "").lower() == "core"
788 compress_core = framework.BoolEnvironmentVariable("CORE_COMPRESS")
790 step = framework.BoolEnvironmentVariable("STEP")
791 force_foreground = framework.BoolEnvironmentVariable("FORCE_FOREGROUND")
793 run_interactive = debug or step or force_foreground
796 num_cpus = len(os.sched_getaffinity(0))
797 except AttributeError:
798 num_cpus = multiprocessing.cpu_count()
799 shm_free = psutil.disk_usage('/dev/shm').free
801 print('OS reports %s available cpu(s). Free shm: %s' % (
802 num_cpus, "{:,}MB".format(shm_free / (1024 * 1024))))
804 test_jobs = os.getenv("TEST_JOBS", "1").lower() # default = 1 process
805 if test_jobs == 'auto':
808 print('Interactive mode required, running on one core')
810 shm_max_processes = 1
811 if shm_free < min_req_shm:
812 raise Exception('Not enough free space in /dev/shm. Required '
813 'free space is at least %sM.'
814 % (min_req_shm >> 20))
816 extra_shm = shm_free - min_req_shm
817 shm_max_processes += extra_shm // shm_per_process
818 concurrent_tests = min(cpu_count(), shm_max_processes)
819 print('Found enough resources to run tests with %s cores'
821 elif test_jobs.isdigit():
822 concurrent_tests = int(test_jobs)
823 print("Running on %s core(s) as set by 'TEST_JOBS'." %
827 print('Running on one core.')
829 if run_interactive and concurrent_tests > 1:
830 raise NotImplementedError(
831 'Running tests interactively (DEBUG is gdb or gdbserver or STEP '
832 'is set) in parallel (TEST_JOBS is more than 1) is not supported')
834 parser = argparse.ArgumentParser(description="VPP unit tests")
835 parser.add_argument("-f", "--failfast", action='store_true',
836 help="fast failure flag")
837 parser.add_argument("-d", "--dir", action='append', type=str,
838 help="directory containing test files "
839 "(may be specified multiple times)")
840 args = parser.parse_args()
841 failfast = args.failfast
844 print("Running tests using custom test runner") # debug message
845 filter_file, filter_class, filter_func = parse_test_option()
847 print("Active filters: file=%s, class=%s, function=%s" % (
848 filter_file, filter_class, filter_func))
850 filter_cb = FilterByTestOption(filter_file, filter_class, filter_func)
852 ignore_path = os.getenv("VENV_PATH", None)
853 cb = SplitToSuitesCallback(filter_cb)
855 print("Adding tests from directory tree %s" % d)
856 discover_tests(d, cb, ignore_path)
858 # suites are not hashable, need to use list
861 for testcase_suite in cb.suites.values():
862 tests_amount += testcase_suite.countTestCases()
863 suites.append(testcase_suite)
865 print("%s out of %s tests match specified filters" % (
866 tests_amount, tests_amount + cb.filtered.countTestCases()))
868 if not running_extended_tests:
869 print("Not running extended tests (some tests will be skipped)")
871 attempts = retries + 1
873 print("Perform %s attempts to pass the suite..." % attempts)
875 if run_interactive and suites:
876 # don't fork if requiring interactive terminal
877 print('Running tests in foreground in the current process')
878 full_suite = unittest.TestSuite()
879 full_suite.addTests(suites)
880 result = VppTestRunner(verbosity=verbose,
882 print_summary=True).run(full_suite)
883 was_successful = result.wasSuccessful()
884 if not was_successful:
885 for test_case_info in result.failed_test_cases_info:
886 handle_failed_suite(test_case_info.logger,
887 test_case_info.tempdir,
888 test_case_info.vpp_pid)
889 if test_case_info in result.core_crash_test_cases_info:
890 check_and_handle_core(test_case_info.vpp_bin_path,
891 test_case_info.tempdir,
892 test_case_info.core_crash_test)
894 sys.exit(not was_successful)
896 print('Running each VPPTestCase in a separate background process'
897 ' with {} parallel process(es)'.format(concurrent_tests))
899 while suites and attempts > 0:
900 results = run_forked(suites)
901 exit_code, suites = parse_results(results)
904 print('Test run was successful')
906 print('%s attempt(s) left.' % attempts)