+import time
+import threading
+import signal
+import psutil
+from multiprocessing import Process, Pipe, cpu_count
+from multiprocessing.queues import Queue
+from multiprocessing.managers import BaseManager
+from framework import VppTestRunner, running_extended_tests, VppTestCase, \
+ get_testcase_doc_name, get_test_description, PASS, FAIL, ERROR, SKIP, \
+ TEST_RUN
+from debug import spawn_gdb
+from log import get_parallel_logger, double_line_delim, RED, YELLOW, GREEN, \
+ colorize
+from discover_tests import discover_tests
+from subprocess import check_output, CalledProcessError
+from util import check_core_path
+
+# timeout which controls how long the child has to finish after seeing
+# a core dump in test temporary directory. If this is exceeded, parent assumes
+# that child process is stuck (e.g. waiting for shm mutex, which will never
+# get unlocked) and kill the child
+core_timeout = 3
+min_req_shm = 536870912 # min 512MB shm required
+# 128MB per extra process
+shm_per_process = 134217728
+
+
+class StreamQueue(Queue):
+ def write(self, msg):
+ self.put(msg)
+
+ def flush(self):
+ sys.__stdout__.flush()
+ sys.__stderr__.flush()
+
+ def fileno(self):
+ return self._writer.fileno()
+
+
+class StreamQueueManager(BaseManager):
+ pass
+
+
+StreamQueueManager.register('StreamQueue', StreamQueue)
+
+
+class TestResult(dict):
+ def __init__(self, testcase_suite):
+ super(TestResult, self).__init__()
+ self[PASS] = []
+ self[FAIL] = []
+ self[ERROR] = []
+ self[SKIP] = []
+ self[TEST_RUN] = []
+ self.testcase_suite = testcase_suite
+ self.testcases = [testcase for testcase in testcase_suite]
+ self.testcases_by_id = {}
+
+ def was_successful(self):
+ return len(self[PASS] + self[SKIP]) \
+ == self.testcase_suite.countTestCases()
+
+ def no_tests_run(self):
+ return 0 == len(self[TEST_RUN])
+
+ def process_result(self, test_id, result):
+ self[result].append(test_id)
+ for testcase in self.testcases:
+ if testcase.id() == test_id:
+ self.testcases_by_id[test_id] = testcase
+ self.testcases.remove(testcase)
+ break
+
+ def suite_from_failed(self):
+ rerun_ids = set([])
+ for testcase in self.testcase_suite:
+ tc_id = testcase.id()
+ if tc_id not in self[PASS] and tc_id not in self[SKIP]:
+ rerun_ids.add(tc_id)
+ if len(rerun_ids) > 0:
+ return suite_from_failed(self.testcase_suite, rerun_ids)
+
+ def get_testcase_names(self, test_id):
+ return self._get_testcase_class(test_id), \
+ self._get_test_description(test_id)
+
+ def _get_test_description(self, test_id):
+ if test_id in self.testcases_by_id:
+ return get_test_description(descriptions,
+ self.testcases_by_id[test_id])
+ else:
+ return test_id
+
+ def _get_testcase_class(self, test_id):
+ if test_id in self.testcases_by_id:
+ return get_testcase_doc_name(self.testcases_by_id[test_id])
+ else:
+ return test_id
+
+
+def test_runner_wrapper(suite, keep_alive_pipe, stdouterr_queue,
+ finished_pipe, result_pipe, logger):
+ sys.stdout = stdouterr_queue
+ sys.stderr = stdouterr_queue
+ VppTestCase.logger = logger
+ result = VppTestRunner(keep_alive_pipe=keep_alive_pipe,
+ descriptions=descriptions,
+ verbosity=verbose,
+ result_pipe=result_pipe,
+ failfast=failfast).run(suite)
+ finished_pipe.send(result.wasSuccessful())
+ finished_pipe.close()
+ keep_alive_pipe.close()
+
+
+class TestCaseWrapper(object):
+ def __init__(self, testcase_suite, manager):
+ self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(
+ duplex=False)
+ self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
+ self.result_parent_end, self.result_child_end = Pipe(duplex=False)
+ self.testcase_suite = testcase_suite
+ self.stdouterr_queue = manager.StreamQueue()
+ self.logger = get_parallel_logger(self.stdouterr_queue)
+ self.child = Process(target=test_runner_wrapper,
+ args=(testcase_suite,
+ self.keep_alive_child_end,
+ self.stdouterr_queue,
+ self.finished_child_end,
+ self.result_child_end,
+ self.logger)
+ )
+ self.child.start()
+ self.last_test_temp_dir = None
+ self.last_test_vpp_binary = None
+ self.last_test = None
+ self.result = None
+ self.vpp_pid = None
+ self.last_heard = time.time()
+ self.core_detected_at = None
+ self.result = TestResult(testcase_suite)
+
+ def close_pipes(self):
+ self.keep_alive_child_end.close()
+ self.finished_child_end.close()
+ self.result_child_end.close()
+ self.keep_alive_parent_end.close()
+ self.finished_parent_end.close()
+ self.result_parent_end.close()
+
+
+def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases,
+ read_testcases):
+ read_testcase = None
+ while read_testcases.is_set() or len(unread_testcases) > 0:
+ if not read_testcase:
+ if len(finished_unread_testcases) > 0:
+ read_testcase = finished_unread_testcases.pop()
+ unread_testcases.remove(read_testcase)
+ elif len(unread_testcases) > 0:
+ read_testcase = unread_testcases.pop()
+ if read_testcase:
+ data = ''
+ while data is not None:
+ sys.stdout.write(data)
+ data = read_testcase.stdouterr_queue.get()
+
+ read_testcase.stdouterr_queue.close()
+ finished_unread_testcases.discard(read_testcase)
+ read_testcase = None
+
+
+def run_forked(testcase_suites):
+ wrapped_testcase_suites = set()
+
+ # suites are unhashable, need to use list
+ results = []
+ debug_core = os.getenv("DEBUG", "").lower() == "core"
+ unread_testcases = set()
+ finished_unread_testcases = set()
+ manager = StreamQueueManager()
+ manager.start()
+ for i in range(concurrent_tests):
+ if len(testcase_suites) > 0:
+ wrapped_testcase_suite = TestCaseWrapper(testcase_suites.pop(0),
+ manager)
+ wrapped_testcase_suites.add(wrapped_testcase_suite)
+ unread_testcases.add(wrapped_testcase_suite)
+ # time.sleep(1)
+ else:
+ break
+
+ read_from_testcases = threading.Event()
+ read_from_testcases.set()
+ stdouterr_thread = threading.Thread(target=stdouterr_reader_wrapper,
+ args=(unread_testcases,
+ finished_unread_testcases,
+ read_from_testcases))
+ stdouterr_thread.start()
+
+ while len(wrapped_testcase_suites) > 0:
+ finished_testcase_suites = set()
+ for wrapped_testcase_suite in wrapped_testcase_suites:
+ while wrapped_testcase_suite.result_parent_end.poll():
+ wrapped_testcase_suite.result.process_result(
+ *wrapped_testcase_suite.result_parent_end.recv())
+ wrapped_testcase_suite.last_heard = time.time()
+
+ if wrapped_testcase_suite.finished_parent_end.poll():
+ wrapped_testcase_suite.finished_parent_end.recv()
+ results.append(wrapped_testcase_suite.result)
+ finished_testcase_suites.add(wrapped_testcase_suite)