-#!/usr/bin/env python
+#!/usr/bin/env python3
import sys
import shutil
import os
-import select
+import fnmatch
import unittest
-import argparse
import time
-from multiprocessing import Process, Pipe
-from framework import VppTestRunner
+import threading
+import traceback
+import signal
+import re
+from multiprocessing import Process, Pipe, get_context
+from multiprocessing.queues import Queue
+from multiprocessing.managers import BaseManager
+from config import config, num_cpus, available_cpus, max_vpp_cpus
+from vpp_papi import VPPApiJSONFiles
+from asfframework import (
+ VppTestRunner,
+ get_testcase_doc_name,
+ get_test_description,
+ get_failed_testcase_linkname,
+ get_testcase_dirname,
+)
+from framework import VppTestCase
+from test_result_code import TestResultCode
from debug import spawn_gdb
-from log import global_logger
+from log import (
+ get_parallel_logger,
+ double_line_delim,
+ RED,
+ YELLOW,
+ GREEN,
+ colorize,
+ single_line_delim,
+)
from discover_tests import discover_tests
+import sanity_run_vpp
from subprocess import check_output, CalledProcessError
-from util import check_core_path
+from util import check_core_path, get_core_path, is_core_present
# timeout which controls how long the child has to finish after seeing
# a core dump in test temporary directory. If this is exceeded, parent assumes
-# that child process is stuck (e.g. waiting for shm mutex, which will never
-# get unlocked) and kill the child
+# that child process is stuck (e.g. waiting for event from vpp) and kill
+# the child
core_timeout = 3
-def test_runner_wrapper(suite, keep_alive_pipe, result_pipe, failed_pipe):
- result = not VppTestRunner(
+class StreamQueue(Queue):
+ def write(self, msg):
+ self.put(msg)
+
+ def flush(self):
+ sys.__stdout__.flush()
+ sys.__stderr__.flush()
+
+ def fileno(self):
+ return self._writer.fileno()
+
+
+class StreamQueueManager(BaseManager):
+ pass
+
+
+StreamQueueManager.register("StreamQueue", StreamQueue)
+
+
+class TestResult(dict):
+ def __init__(self, testcase_suite, testcases_by_id=None):
+ super(TestResult, self).__init__()
+ for trc in list(TestResultCode):
+ self[trc] = []
+ self.crashed = False
+ self.testcase_suite = testcase_suite
+ self.testcases = [testcase for testcase in testcase_suite]
+ self.testcases_by_id = testcases_by_id
+
+ def was_successful(self):
+ return (
+ 0
+ == len(self[TestResultCode.FAIL])
+ == len(self[TestResultCode.ERROR])
+ == len(self[TestResultCode.UNEXPECTED_PASS])
+ and len(self[TestResultCode.PASS])
+ + len(self[TestResultCode.SKIP])
+ + len(self[TestResultCode.SKIP_CPU_SHORTAGE])
+ + len(self[TestResultCode.EXPECTED_FAIL])
+ == self.testcase_suite.countTestCases()
+ )
+
+ def no_tests_run(self):
+ return 0 == len(self[TestResultCode.TEST_RUN])
+
+ def process_result(self, test_id, result):
+ self[result].append(test_id)
+
+ def suite_from_failed(self):
+ rerun_ids = set([])
+ for testcase in self.testcase_suite:
+ tc_id = testcase.id()
+ if (
+ tc_id
+ not in self[TestResultCode.PASS]
+ + self[TestResultCode.SKIP]
+ + self[TestResultCode.SKIP_CPU_SHORTAGE]
+ + self[TestResultCode.EXPECTED_FAIL]
+ ):
+ rerun_ids.add(tc_id)
+ if rerun_ids:
+ return suite_from_failed(self.testcase_suite, rerun_ids)
+
+ def get_testcase_names(self, test_id):
+ # could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
+ setup_teardown_match = re.match(
+ r"((tearDownClass)|(setUpClass)) \((.+\..+)\)", test_id
+ )
+ if setup_teardown_match:
+ test_name, _, _, testcase_name = setup_teardown_match.groups()
+ if len(testcase_name.split(".")) == 2:
+ for key in self.testcases_by_id.keys():
+ if key.startswith(testcase_name):
+ testcase_name = key
+ break
+ testcase_name = self._get_testcase_doc_name(testcase_name)
+ else:
+ test_name = self._get_test_description(test_id)
+ testcase_name = self._get_testcase_doc_name(test_id)
+
+ return testcase_name, test_name
+
+ def _get_test_description(self, test_id):
+ if test_id in self.testcases_by_id:
+ desc = get_test_description(descriptions, self.testcases_by_id[test_id])
+ else:
+ desc = test_id
+ return desc
+
+ def _get_testcase_doc_name(self, test_id):
+ if test_id in self.testcases_by_id:
+ doc_name = get_testcase_doc_name(self.testcases_by_id[test_id])
+ else:
+ doc_name = test_id
+ return doc_name
+
+
+def test_runner_wrapper(
+ suite, keep_alive_pipe, stdouterr_queue, finished_pipe, result_pipe, logger
+):
+ sys.stdout = stdouterr_queue
+ sys.stderr = stdouterr_queue
+ VppTestCase.parallel_handler = logger.handlers[0]
+ result = VppTestRunner(
keep_alive_pipe=keep_alive_pipe,
- failed_pipe=failed_pipe,
- verbosity=verbose,
- failfast=failfast).run(suite).wasSuccessful()
- result_pipe.send(result)
- result_pipe.close()
+ descriptions=descriptions,
+ verbosity=config.verbose,
+ result_pipe=result_pipe,
+ failfast=config.failfast,
+ print_summary=False,
+ ).run(suite)
+ finished_pipe.send(result.wasSuccessful())
+ finished_pipe.close()
keep_alive_pipe.close()
- failed_pipe.close()
-class add_to_suite_callback:
- def __init__(self, suite):
- self.suite = suite
+class TestCaseWrapper(object):
+ def __init__(self, testcase_suite, manager):
+ self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(duplex=False)
+ self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
+ self.result_parent_end, self.result_child_end = Pipe(duplex=False)
+ self.testcase_suite = testcase_suite
+ self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
+ self.logger = get_parallel_logger(self.stdouterr_queue)
+ self.child = Process(
+ target=test_runner_wrapper,
+ args=(
+ testcase_suite,
+ self.keep_alive_child_end,
+ self.stdouterr_queue,
+ self.finished_child_end,
+ self.result_child_end,
+ self.logger,
+ ),
+ )
+ self.child.start()
+ self.last_test_temp_dir = None
+ self.last_test_vpp_binary = None
+ self._last_test = None
+ self.last_test_id = None
+ self.vpp_pid = None
+ self.last_heard = time.time()
+ self.core_detected_at = None
+ self.testcases_by_id = {}
+ self.testclasess_with_core = {}
+ for testcase in self.testcase_suite:
+ self.testcases_by_id[testcase.id()] = testcase
+ self.result = TestResult(testcase_suite, self.testcases_by_id)
+
+ @property
+ def last_test(self):
+ return self._last_test
+
+ @last_test.setter
+ def last_test(self, test_id):
+ self.last_test_id = test_id
+ if test_id in self.testcases_by_id:
+ testcase = self.testcases_by_id[test_id]
+ self._last_test = testcase.shortDescription()
+ if not self._last_test:
+ self._last_test = str(testcase)
+ else:
+ self._last_test = test_id
+
+ def add_testclass_with_core(self):
+ if self.last_test_id in self.testcases_by_id:
+ test = self.testcases_by_id[self.last_test_id]
+ class_name = unittest.util.strclass(test.__class__)
+ test_name = "'{}' ({})".format(
+ get_test_description(descriptions, test), self.last_test_id
+ )
+ else:
+ test_name = self.last_test_id
+ class_name = re.match(
+ r"((tearDownClass)|(setUpClass)) " r"\((.+\..+)\)", test_name
+ ).groups()[3]
+ if class_name not in self.testclasess_with_core:
+ self.testclasess_with_core[class_name] = (
+ test_name,
+ self.last_test_vpp_binary,
+ self.last_test_temp_dir,
+ )
+
+ def close_pipes(self):
+ self.keep_alive_child_end.close()
+ self.finished_child_end.close()
+ self.result_child_end.close()
+ self.keep_alive_parent_end.close()
+ self.finished_parent_end.close()
+ self.result_parent_end.close()
+
+ def was_successful(self):
+ return self.result.was_successful()
+
+ @property
+ def cpus_used(self):
+ return self.testcase_suite.cpus_used
+
+ def get_assigned_cpus(self):
+ return self.testcase_suite.get_assigned_cpus()
+
+
+def stdouterr_reader_wrapper(
+ unread_testcases, finished_unread_testcases, read_testcases
+):
+ read_testcase = None
+ while read_testcases.is_set() or unread_testcases:
+ if finished_unread_testcases:
+ read_testcase = finished_unread_testcases.pop()
+ unread_testcases.remove(read_testcase)
+ elif unread_testcases:
+ read_testcase = unread_testcases.pop()
+ if read_testcase:
+ data = ""
+ while data is not None:
+ sys.stdout.write(data)
+ data = read_testcase.stdouterr_queue.get()
+
+ read_testcase.stdouterr_queue.close()
+ finished_unread_testcases.discard(read_testcase)
+ read_testcase = None
+
+
+def handle_failed_suite(logger, last_test_temp_dir, vpp_pid, vpp_binary):
+ if last_test_temp_dir:
+ # Need to create link in case of a timeout or core dump without failure
+ lttd = os.path.basename(last_test_temp_dir)
+ link_path = os.path.join(config.failed_dir, f"{lttd}-FAILED")
+ if not os.path.exists(link_path):
+ os.symlink(last_test_temp_dir, link_path)
+ logger.error(
+ "Symlink to failed testcase directory: %s -> %s" % (link_path, lttd)
+ )
+
+ # Report core existence
+ core_path = get_core_path(last_test_temp_dir)
+ if os.path.exists(core_path):
+ logger.error(
+ "Core-file exists in test temporary directory: %s!" % core_path
+ )
+ check_core_path(logger, core_path)
+ logger.debug("Running 'file %s':" % core_path)
+ try:
+ info = check_output(["file", core_path])
+ logger.debug(info)
+ except CalledProcessError as e:
+ logger.error(
+ "Subprocess returned with return code "
+ "while running `file' utility on core-file "
+ "returned: "
+ "rc=%s",
+ e.returncode,
+ )
+ except OSError as e:
+ logger.error(
+ "Subprocess returned with OS error while "
+ "running 'file' utility "
+ "on core-file: "
+ "(%s) %s",
+ e.errno,
+ e.strerror,
+ )
+ except Exception as e:
+ logger.exception("Unexpected error running `file' utility on core-file")
+ logger.error(f"gdb {vpp_binary} {core_path}")
+
+ if vpp_pid:
+ # Copy api post mortem
+ api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
+ if os.path.isfile(api_post_mortem_path):
+ logger.error(
+ "Copying api_post_mortem.%d to %s" % (vpp_pid, last_test_temp_dir)
+ )
+ shutil.copy2(api_post_mortem_path, last_test_temp_dir)
+
+
+def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
+ if is_core_present(tempdir):
+ if debug_core:
+ print(
+ "VPP core detected in %s. Last test running was %s"
+ % (tempdir, core_crash_test)
+ )
+ print(single_line_delim)
+ spawn_gdb(vpp_binary, get_core_path(tempdir))
+ print(single_line_delim)
+ elif config.compress_core:
+ print("Compressing core-file in test directory `%s'" % tempdir)
+ os.system("gzip %s" % get_core_path(tempdir))
+
+
+def handle_cores(failed_testcases):
+ for failed_testcase in failed_testcases:
+ tcs_with_core = failed_testcase.testclasess_with_core
+ if tcs_with_core:
+ for test, vpp_binary, tempdir in tcs_with_core.values():
+ check_and_handle_core(vpp_binary, tempdir, test)
+
+
+def process_finished_testsuite(
+ wrapped_testcase_suite, finished_testcase_suites, failed_wrapped_testcases, results
+):
+ results.append(wrapped_testcase_suite.result)
+ finished_testcase_suites.add(wrapped_testcase_suite)
+ stop_run = False
+ if config.failfast and not wrapped_testcase_suite.was_successful():
+ stop_run = True
+
+ if not wrapped_testcase_suite.was_successful():
+ failed_wrapped_testcases.add(wrapped_testcase_suite)
+ handle_failed_suite(
+ wrapped_testcase_suite.logger,
+ wrapped_testcase_suite.last_test_temp_dir,
+ wrapped_testcase_suite.vpp_pid,
+ wrapped_testcase_suite.last_test_vpp_binary,
+ )
+
+ return stop_run
+
+
+def run_forked(testcase_suites):
+ wrapped_testcase_suites = set()
+ solo_testcase_suites = []
+
+ # suites are unhashable, need to use list
+ results = []
+ unread_testcases = set()
+ finished_unread_testcases = set()
+ manager = StreamQueueManager()
+ manager.start()
+ tests_running = 0
+ free_cpus = list(available_cpus)
+
+ def on_suite_start(tc):
+ nonlocal tests_running
+ nonlocal free_cpus
+ tests_running = tests_running + 1
+
+ def on_suite_finish(tc):
+ nonlocal tests_running
+ nonlocal free_cpus
+ tests_running = tests_running - 1
+ assert tests_running >= 0
+ free_cpus.extend(tc.get_assigned_cpus())
+
+ def run_suite(suite):
+ nonlocal manager
+ nonlocal wrapped_testcase_suites
+ nonlocal unread_testcases
+ nonlocal free_cpus
+ suite.assign_cpus(free_cpus[: suite.cpus_used])
+ free_cpus = free_cpus[suite.cpus_used :]
+ wrapper = TestCaseWrapper(suite, manager)
+ wrapped_testcase_suites.add(wrapper)
+ unread_testcases.add(wrapper)
+ on_suite_start(suite)
+
+ def can_run_suite(suite):
+ return tests_running < max_concurrent_tests and (
+ suite.cpus_used <= len(free_cpus) or suite.cpus_used > max_vpp_cpus
+ )
+
+ while free_cpus and testcase_suites:
+ a_suite = testcase_suites[0]
+ if a_suite.is_tagged_run_solo:
+ a_suite = testcase_suites.pop(0)
+ solo_testcase_suites.append(a_suite)
+ continue
+ if can_run_suite(a_suite):
+ a_suite = testcase_suites.pop(0)
+ run_suite(a_suite)
+ else:
+ break
+
+ if tests_running == 0 and solo_testcase_suites:
+ a_suite = solo_testcase_suites.pop(0)
+ run_suite(a_suite)
+
+ read_from_testcases = threading.Event()
+ read_from_testcases.set()
+ stdouterr_thread = threading.Thread(
+ target=stdouterr_reader_wrapper,
+ args=(unread_testcases, finished_unread_testcases, read_from_testcases),
+ )
+ stdouterr_thread.start()
+
+ failed_wrapped_testcases = set()
+ stop_run = False
+
+ try:
+ while wrapped_testcase_suites or testcase_suites:
+ finished_testcase_suites = set()
+ for wrapped_testcase_suite in wrapped_testcase_suites:
+ while wrapped_testcase_suite.result_parent_end.poll():
+ wrapped_testcase_suite.result.process_result(
+ *wrapped_testcase_suite.result_parent_end.recv()
+ )
+ wrapped_testcase_suite.last_heard = time.time()
+
+ while wrapped_testcase_suite.keep_alive_parent_end.poll():
+ (
+ wrapped_testcase_suite.last_test,
+ wrapped_testcase_suite.last_test_vpp_binary,
+ wrapped_testcase_suite.last_test_temp_dir,
+ wrapped_testcase_suite.vpp_pid,
+ ) = wrapped_testcase_suite.keep_alive_parent_end.recv()
+ wrapped_testcase_suite.last_heard = time.time()
+
+ if wrapped_testcase_suite.finished_parent_end.poll():
+ wrapped_testcase_suite.finished_parent_end.recv()
+ wrapped_testcase_suite.last_heard = time.time()
+ stop_run = (
+ process_finished_testsuite(
+ wrapped_testcase_suite,
+ finished_testcase_suites,
+ failed_wrapped_testcases,
+ results,
+ )
+ or stop_run
+ )
+ continue
+
+ fail = False
+ if wrapped_testcase_suite.last_heard + config.timeout < time.time():
+ fail = True
+ wrapped_testcase_suite.logger.critical(
+ "Child test runner process timed out "
+ "(last test running was `%s' in `%s')!"
+ % (
+ wrapped_testcase_suite.last_test,
+ wrapped_testcase_suite.last_test_temp_dir,
+ )
+ )
+ elif not wrapped_testcase_suite.child.is_alive():
+ fail = True
+ wrapped_testcase_suite.logger.critical(
+ "Child test runner process unexpectedly died "
+ "(last test running was `%s' in `%s')!"
+ % (
+ wrapped_testcase_suite.last_test,
+ wrapped_testcase_suite.last_test_temp_dir,
+ )
+ )
+ elif (
+ wrapped_testcase_suite.last_test_temp_dir
+ and wrapped_testcase_suite.last_test_vpp_binary
+ ):
+ if is_core_present(wrapped_testcase_suite.last_test_temp_dir):
+ wrapped_testcase_suite.add_testclass_with_core()
+ if wrapped_testcase_suite.core_detected_at is None:
+ wrapped_testcase_suite.core_detected_at = time.time()
+ elif (
+ wrapped_testcase_suite.core_detected_at + core_timeout
+ < time.time()
+ ):
+ wrapped_testcase_suite.logger.critical(
+ "Child test runner process unresponsive and "
+ "core-file exists in test temporary directory "
+ "(last test running was `%s' in `%s')!"
+ % (
+ wrapped_testcase_suite.last_test,
+ wrapped_testcase_suite.last_test_temp_dir,
+ )
+ )
+ fail = True
+
+ if fail:
+ wrapped_testcase_suite.child.terminate()
+ try:
+ # terminating the child process tends to leave orphan
+ # VPP process around
+ if wrapped_testcase_suite.vpp_pid:
+ os.kill(wrapped_testcase_suite.vpp_pid, signal.SIGTERM)
+ except OSError:
+ # already dead
+ pass
+ wrapped_testcase_suite.result.crashed = True
+ wrapped_testcase_suite.result.process_result(
+ wrapped_testcase_suite.last_test_id, TestResultCode.ERROR
+ )
+ stop_run = (
+ process_finished_testsuite(
+ wrapped_testcase_suite,
+ finished_testcase_suites,
+ failed_wrapped_testcases,
+ results,
+ )
+ or stop_run
+ )
+
+ for finished_testcase in finished_testcase_suites:
+ # Somewhat surprisingly, the join below may
+ # timeout, even if client signaled that
+ # it finished - so we note it just in case.
+ join_start = time.time()
+ finished_testcase.child.join(test_finished_join_timeout)
+ join_end = time.time()
+ if join_end - join_start >= test_finished_join_timeout:
+ finished_testcase.logger.error(
+ "Timeout joining finished test: %s (pid %d)"
+ % (finished_testcase.last_test, finished_testcase.child.pid)
+ )
+ finished_testcase.close_pipes()
+ wrapped_testcase_suites.remove(finished_testcase)
+ finished_unread_testcases.add(finished_testcase)
+ finished_testcase.stdouterr_queue.put(None)
+ on_suite_finish(finished_testcase)
+ if stop_run:
+ while testcase_suites:
+ results.append(TestResult(testcase_suites.pop(0)))
+ elif testcase_suites:
+ a_suite = testcase_suites[0]
+ while a_suite and a_suite.is_tagged_run_solo:
+ testcase_suites.pop(0)
+ solo_testcase_suites.append(a_suite)
+ if testcase_suites:
+ a_suite = testcase_suites[0]
+ else:
+ a_suite = None
+ if a_suite and can_run_suite(a_suite):
+ testcase_suites.pop(0)
+ run_suite(a_suite)
+ if solo_testcase_suites and tests_running == 0:
+ a_suite = solo_testcase_suites.pop(0)
+ run_suite(a_suite)
+ time.sleep(0.1)
+ except Exception:
+ for wrapped_testcase_suite in wrapped_testcase_suites:
+ wrapped_testcase_suite.child.terminate()
+ wrapped_testcase_suite.stdouterr_queue.put(None)
+ raise
+ finally:
+ read_from_testcases.clear()
+ stdouterr_thread.join(config.timeout)
+ manager.shutdown()
+
+ handle_cores(failed_wrapped_testcases)
+ return results
+
+
+class TestSuiteWrapper(unittest.TestSuite):
+ cpus_used = 0
+
+ def __init__(self):
+ return super().__init__()
+
+ def addTest(self, test):
+ self.cpus_used = max(self.cpus_used, test.get_cpus_required())
+ super().addTest(test)
+
+ def assign_cpus(self, cpus):
+ self.cpus = cpus
+
+ def _handleClassSetUp(self, test, result):
+ if not test.__class__.skipped_due_to_cpu_lack:
+ test.assign_cpus(self.cpus)
+ super()._handleClassSetUp(test, result)
+
+ def get_assigned_cpus(self):
+ return self.cpus
+
+
+class SplitToSuitesCallback:
+ def __init__(self, filter_callback):
+ self.suites = {}
+ self.suite_name = "default"
+ self.filter_callback = filter_callback
+ self.filtered = TestSuiteWrapper()
def __call__(self, file_name, cls, method):
- suite.addTest(cls(method))
+ test_method = cls(method)
+ if self.filter_callback(file_name, cls.__name__, method):
+ self.suite_name = file_name + cls.__name__
+ if self.suite_name not in self.suites:
+ self.suites[self.suite_name] = TestSuiteWrapper()
+ self.suites[self.suite_name].is_tagged_run_solo = False
+ self.suites[self.suite_name].addTest(test_method)
+ if test_method.is_tagged_run_solo():
+ self.suites[self.suite_name].is_tagged_run_solo = True
+
+ else:
+ self.filtered.addTest(test_method)
+
+
+def parse_test_filter(test_filter):
+ f = test_filter
+ filter_file_name = None
+ filter_class_name = None
+ filter_func_name = None
+ if f:
+ if "." in f:
+ parts = f.split(".")
+ if len(parts) > 3:
+ raise Exception(f"Invalid test filter: {test_filter}")
+ if len(parts) > 2:
+ if parts[2] not in ("*", ""):
+ filter_func_name = parts[2]
+ if parts[1] not in ("*", ""):
+ filter_class_name = parts[1]
+ if parts[0] not in ("*", ""):
+ if parts[0].startswith("test_"):
+ filter_file_name = parts[0]
+ else:
+ filter_file_name = "test_%s" % parts[0]
+ else:
+ if f.startswith("test_"):
+ filter_file_name = f
+ else:
+ filter_file_name = "test_%s" % f
+ if filter_file_name:
+ filter_file_name = "%s.py" % filter_file_name
+ return filter_file_name, filter_class_name, filter_func_name
+
+
+def filter_tests(tests, filter_cb):
+ result = TestSuiteWrapper()
+ for t in tests:
+ if isinstance(t, unittest.suite.TestSuite):
+ # this is a bunch of tests, recursively filter...
+ x = filter_tests(t, filter_cb)
+ if x.countTestCases() > 0:
+ result.addTest(x)
+ elif isinstance(t, unittest.TestCase):
+ # this is a single test
+ parts = t.id().split(".")
+ # t.id() for common cases like this:
+ # test_classifier.TestClassifier.test_acl_ip
+ # apply filtering only if it is so
+ if len(parts) == 3:
+ if not filter_cb(parts[0], parts[1], parts[2]):
+ continue
+ result.addTest(t)
+ else:
+ # unexpected object, don't touch it
+ result.addTest(t)
+ return result
+
+
+class FilterByTestOption:
+ def __init__(self, filters):
+ self.filters = filters
+
+ def __call__(self, file_name, class_name, func_name):
+ def test_one(
+ filter_file_name,
+ filter_class_name,
+ filter_func_name,
+ file_name,
+ class_name,
+ func_name,
+ ):
+ if filter_file_name:
+ fn_match = fnmatch.fnmatch(file_name, filter_file_name)
+ if not fn_match:
+ return False
+ if filter_class_name and class_name != filter_class_name:
+ return False
+ if filter_func_name and func_name != filter_func_name:
+ return False
+ return True
+
+ for filter_file_name, filter_class_name, filter_func_name in self.filters:
+ if test_one(
+ filter_file_name,
+ filter_class_name,
+ filter_func_name,
+ file_name,
+ class_name,
+ func_name,
+ ):
+ return True
+
+ return False
-class Filter_by_class_list:
- def __init__(self, class_list):
- self.class_list = class_list
+class FilterByClassList:
+ def __init__(self, classes_with_filenames):
+ self.classes_with_filenames = classes_with_filenames
def __call__(self, file_name, class_name, func_name):
- return class_name in self.class_list
+ return ".".join([file_name, class_name]) in self.classes_with_filenames
def suite_from_failed(suite, failed):
- filter_cb = Filter_by_class_list(failed)
- suite = VppTestRunner.filter_tests(suite, filter_cb)
- if 0 == suite.countTestCases():
- raise Exception("Suite is empty after filtering out the failed tests!")
+ failed = {x.rsplit(".", 1)[0] for x in failed}
+ filter_cb = FilterByClassList(failed)
+ suite = filter_tests(suite, filter_cb)
return suite
-def run_forked(suite):
- keep_alive_parent_end, keep_alive_child_end = Pipe(duplex=False)
- result_parent_end, result_child_end = Pipe(duplex=False)
- failed_parent_end, failed_child_end = Pipe(duplex=False)
-
- child = Process(target=test_runner_wrapper,
- args=(suite, keep_alive_child_end, result_child_end,
- failed_child_end))
- child.start()
- last_test_temp_dir = None
- last_test_vpp_binary = None
- last_test = None
- result = None
- failed = set()
- last_heard = time.time()
- core_detected_at = None
- debug_core = os.getenv("DEBUG", "").lower() == "core"
- while True:
- readable = select.select([keep_alive_parent_end.fileno(),
- result_parent_end.fileno(),
- failed_parent_end.fileno(),
- ],
- [], [], 1)[0]
- if result_parent_end.fileno() in readable:
- result = result_parent_end.recv()
- break
- if keep_alive_parent_end.fileno() in readable:
- while keep_alive_parent_end.poll():
- last_test, last_test_vpp_binary,\
- last_test_temp_dir, vpp_pid = keep_alive_parent_end.recv()
- last_heard = time.time()
- if failed_parent_end.fileno() in readable:
- while failed_parent_end.poll():
- failed_test = failed_parent_end.recv()
- failed.add(failed_test.__name__)
- last_heard = time.time()
- fail = False
- if last_heard + test_timeout < time.time() and \
- not os.path.isfile("%s/_core_handled" % last_test_temp_dir):
- fail = True
- global_logger.critical("Timeout while waiting for child test "
- "runner process (last test running was "
- "`%s' in `%s')!" %
- (last_test, last_test_temp_dir))
- elif not child.is_alive():
- fail = True
- global_logger.critical("Child python process unexpectedly died "
- "(last test running was `%s' in `%s')!" %
- (last_test, last_test_temp_dir))
- elif last_test_temp_dir and last_test_vpp_binary:
- core_path = "%s/core" % last_test_temp_dir
- if os.path.isfile(core_path):
- if core_detected_at is None:
- core_detected_at = time.time()
- elif core_detected_at + core_timeout < time.time():
- if not os.path.isfile(
- "%s/_core_handled" % last_test_temp_dir):
- global_logger.critical(
- "Child python process unresponsive and core-file "
- "exists in test temporary directory!")
- fail = True
-
- if fail:
- failed_dir = os.getenv('VPP_TEST_FAILED_DIR')
- lttd = last_test_temp_dir.split("/")[-1]
- link_path = '%s%s-FAILED' % (failed_dir, lttd)
- global_logger.error("Creating a link to the failed " +
- "test: %s -> %s" % (link_path, lttd))
- try:
- os.symlink(last_test_temp_dir, link_path)
- except Exception:
- pass
- api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
- if os.path.isfile(api_post_mortem_path):
- global_logger.error("Copying api_post_mortem.%d to %s" %
- (vpp_pid, last_test_temp_dir))
- shutil.copy2(api_post_mortem_path, last_test_temp_dir)
- if last_test_temp_dir and last_test_vpp_binary:
- core_path = "%s/core" % last_test_temp_dir
- if os.path.isfile(core_path):
- global_logger.error("Core-file exists in test temporary "
- "directory: %s!" % core_path)
- check_core_path(global_logger, core_path)
- global_logger.debug("Running `file %s':" % core_path)
- try:
- info = check_output(["file", core_path])
- global_logger.debug(info)
- except CalledProcessError as e:
- global_logger.error(
- "Could not run `file' utility on core-file, "
- "rc=%s" % e.returncode)
- pass
- if debug_core:
- spawn_gdb(last_test_vpp_binary, core_path,
- global_logger)
- child.terminate()
- result = -1
- break
- keep_alive_parent_end.close()
- result_parent_end.close()
- failed_parent_end.close()
- return result, failed
+class AllResults(dict):
+ def __init__(self):
+ super(AllResults, self).__init__()
+ self.all_testcases = 0
+ self.results_per_suite = []
+ for trc in list(TestResultCode):
+ self[trc] = 0
+ self.rerun = []
+ self.testsuites_no_tests_run = []
+ def add_results(self, result):
+ self.results_per_suite.append(result)
+ for trc in list(TestResultCode):
+ self[trc] += len(result[trc])
-if __name__ == '__main__':
+ def add_result(self, result):
+ retval = 0
+ self.all_testcases += result.testcase_suite.countTestCases()
+ self.add_results(result)
- try:
- verbose = int(os.getenv("V", 0))
- except ValueError:
- verbose = 0
+ if result.no_tests_run():
+ self.testsuites_no_tests_run.append(result.testcase_suite)
+ if result.crashed:
+ retval = -1
+ else:
+ retval = 1
+ elif not result.was_successful():
+ retval = 1
- default_test_timeout = 600 # 10 minutes
- try:
- test_timeout = int(os.getenv("TIMEOUT", default_test_timeout))
- except ValueError:
- test_timeout = default_test_timeout
-
- debug = os.getenv("DEBUG")
-
- s = os.getenv("STEP", "n")
- step = True if s.lower() in ("y", "yes", "1") else False
-
- parser = argparse.ArgumentParser(description="VPP unit tests")
- parser.add_argument("-f", "--failfast", action='count',
- help="fast failure flag")
- parser.add_argument("-d", "--dir", action='append', type=str,
- help="directory containing test files "
- "(may be specified multiple times)")
- args = parser.parse_args()
- failfast = True if args.failfast == 1 else False
-
- suite = unittest.TestSuite()
- cb = add_to_suite_callback(suite)
- for d in args.dir:
+ if retval != 0:
+ self.rerun.append(result.testcase_suite)
+
+ return retval
+
+ def print_results(self):
+ print("")
+ print(double_line_delim)
+ print("TEST RESULTS:")
+
+ def indent_results(lines):
+ lines = list(filter(None, lines))
+ maximum = max(lines, key=lambda x: x.index(":"))
+ maximum = 4 + maximum.index(":")
+ for l in lines:
+ padding = " " * (maximum - l.index(":"))
+ print(f"{padding}{l}")
+
+ indent_results(
+ [
+ f"Scheduled tests: {self.all_testcases}",
+ f"Executed tests: {self[TestResultCode.TEST_RUN]}",
+ f"Passed tests: {colorize(self[TestResultCode.PASS], GREEN)}",
+ f"Expected failures: {colorize(self[TestResultCode.EXPECTED_FAIL], GREEN)}"
+ if self[TestResultCode.EXPECTED_FAIL]
+ else None,
+ f"Skipped tests: {colorize(self[TestResultCode.SKIP], YELLOW)}"
+ if self[TestResultCode.SKIP]
+ else None,
+ f"Not Executed tests: {colorize(self.not_executed, RED)}"
+ if self.not_executed
+ else None,
+ f"Failures: {colorize(self[TestResultCode.FAIL], RED)}"
+ if self[TestResultCode.FAIL]
+ else None,
+ f"Unexpected passes: {colorize(self[TestResultCode.UNEXPECTED_PASS], RED)}"
+ if self[TestResultCode.UNEXPECTED_PASS]
+ else None,
+ f"Errors: {colorize(self[TestResultCode.ERROR], RED)}"
+ if self[TestResultCode.ERROR]
+ else None,
+ "Tests skipped due to lack of CPUS: "
+ f"{colorize(self[TestResultCode.SKIP_CPU_SHORTAGE], YELLOW)}"
+ if self[TestResultCode.SKIP_CPU_SHORTAGE]
+ else None,
+ ]
+ )
+
+ if self.all_failed > 0:
+ print("FAILURES AND ERRORS IN TESTS:")
+ for result in self.results_per_suite:
+ old_testcase_name = None
+ for tr_code, headline in (
+ (TestResultCode.FAIL, "FAILURE"),
+ (TestResultCode.ERROR, "ERROR"),
+ (TestResultCode.UNEXPECTED_PASS, "UNEXPECTED PASS"),
+ ):
+ if not result[tr_code]:
+ continue
+
+ for failed_test_id in result[tr_code]:
+ new_testcase_name, test_name = result.get_testcase_names(
+ failed_test_id
+ )
+ if new_testcase_name != old_testcase_name:
+ print(
+ f" Testcase name: {colorize(new_testcase_name, RED)}"
+ )
+ old_testcase_name = new_testcase_name
+ print(
+ f" {headline}: {colorize(test_name, RED)} [{failed_test_id}]"
+ )
+
+ if self.testsuites_no_tests_run:
+ print("TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:")
+ tc_classes = set()
+ for testsuite in self.testsuites_no_tests_run:
+ for testcase in testsuite:
+ tc_classes.add(get_testcase_doc_name(testcase))
+ for tc_class in tc_classes:
+ print(" {}".format(colorize(tc_class, RED)))
+
+ if self[TestResultCode.SKIP_CPU_SHORTAGE]:
+ print()
+ print(
+ colorize(
+ " SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT"
+ " ENOUGH CPUS AVAILABLE",
+ YELLOW,
+ )
+ )
+ print(double_line_delim)
+ print("")
+
+ @property
+ def not_executed(self):
+ return self.all_testcases - self[TestResultCode.TEST_RUN]
+
+ @property
+ def all_failed(self):
+ return (
+ self[TestResultCode.FAIL]
+ + self[TestResultCode.ERROR]
+ + self[TestResultCode.UNEXPECTED_PASS]
+ )
+
+
+def parse_results(results):
+ """
+ Prints the number of scheduled, executed, not executed, passed, failed,
+ errored and skipped tests and details about failed and errored tests.
+
+ Also returns all suites where any test failed.
+
+ :param results:
+ :return:
+ """
+
+ results_per_suite = AllResults()
+ crashed = False
+ failed = False
+ for result in results:
+ result_code = results_per_suite.add_result(result)
+ if result_code == 1:
+ failed = True
+ elif result_code == -1:
+ crashed = True
+
+ results_per_suite.print_results()
+
+ if crashed:
+ return_code = -1
+ elif failed:
+ return_code = 1
+ else:
+ return_code = 0
+ return return_code, results_per_suite.rerun
+
+
+if __name__ == "__main__":
+ print(f"Config is: {config}")
+
+ if config.api_preload:
+ VPPApiJSONFiles.load_api(apidir=config.extern_apidir + [config.vpp_install_dir])
+
+ if config.sanity:
+ print("Running sanity test case.")
+ try:
+ rc = sanity_run_vpp.main()
+ if rc != 0:
+ sys.exit(rc)
+ except Exception as e:
+ print(traceback.format_exc())
+ print("Couldn't run sanity test case.")
+ sys.exit(-1)
+
+ test_finished_join_timeout = 15
+
+ debug_gdb = config.debug in ["gdb", "gdbserver", "attach"]
+ debug_core = config.debug == "core"
+
+ run_interactive = debug_gdb or config.step or config.force_foreground
+
+ max_concurrent_tests = 0
+ print(f"OS reports {num_cpus} available cpu(s).")
+
+ test_jobs = config.jobs
+ if test_jobs == "auto":
+ if run_interactive:
+ max_concurrent_tests = 1
+ print("Interactive mode required, running tests consecutively.")
+ else:
+ max_concurrent_tests = num_cpus
+ print(
+ f"Running at most {max_concurrent_tests} python test "
+ "processes concurrently."
+ )
+ else:
+ max_concurrent_tests = test_jobs
+ print(
+ f"Running at most {max_concurrent_tests} python test processes "
+ "concurrently as set by 'TEST_JOBS'."
+ )
+
+ print(f"Using at most {max_vpp_cpus} cpus for VPP threads.")
+
+ if run_interactive and max_concurrent_tests > 1:
+ raise NotImplementedError(
+ "Running tests interactively (DEBUG is gdb[server] or ATTACH or "
+ "STEP is set) in parallel (TEST_JOBS is more than 1) is not "
+ "supported"
+ )
+
+ descriptions = True
+
+ print("Running tests using custom test runner.")
+ filters = [(parse_test_filter(f)) for f in config.filter.split(",")]
+
+ print(
+ "Selected filters: ",
+ "|".join(
+ f"file={filter_file}, class={filter_class}, function={filter_func}"
+ for filter_file, filter_class, filter_func in filters
+ ),
+ )
+
+ filter_cb = FilterByTestOption(filters)
+
+ cb = SplitToSuitesCallback(filter_cb)
+ for d in config.test_src_dir:
print("Adding tests from directory tree %s" % d)
discover_tests(d, cb)
- try:
- retries = int(os.getenv("RETRIES", 0))
- except ValueError:
- retries = 0
+ # suites are not hashable, need to use list
+ suites = []
+ tests_amount = 0
+ for testcase_suite in cb.suites.values():
+ tests_amount += testcase_suite.countTestCases()
+ if testcase_suite.cpus_used > max_vpp_cpus:
+ # here we replace test functions with lambdas to just skip them
+ # but we also replace setUp/tearDown functions to do nothing
+ # so that the test can be "started" and "stopped", so that we can
+ # still keep those prints (test description - SKIP), which are done
+ # in stopTest() (for that to trigger, test function must run)
+ for t in testcase_suite:
+ for m in dir(t):
+ if m.startswith("test_"):
+ setattr(t, m, lambda: t.skipTest("not enough cpus"))
+ setattr(t.__class__, "setUpClass", lambda: None)
+ setattr(t.__class__, "tearDownClass", lambda: None)
+ setattr(t, "setUp", lambda: None)
+ setattr(t, "tearDown", lambda: None)
+ t.__class__.skipped_due_to_cpu_lack = True
+ suites.append(testcase_suite)
- try:
- force_foreground = int(os.getenv("FORCE_FOREGROUND", 0))
- except ValueError:
- force_foreground = 0
- attempts = retries + 1
+ print(
+ "%s out of %s tests match specified filters"
+ % (tests_amount, tests_amount + cb.filtered.countTestCases())
+ )
+
+ if not config.extended:
+ print("Not running extended tests (some tests will be skipped)")
+
+ attempts = config.retries + 1
if attempts > 1:
print("Perform %s attempts to pass the suite..." % attempts)
- if (debug is not None and debug.lower() in ["gdb", "gdbserver"]) or step\
- or force_foreground:
- # don't fork if requiring interactive terminal..
- sys.exit(not VppTestRunner(
- verbosity=verbose, failfast=failfast).run(suite).wasSuccessful())
+
+ if run_interactive and suites:
+ # don't fork if requiring interactive terminal
+ print("Running tests in foreground in the current process")
+ full_suite = unittest.TestSuite()
+ free_cpus = list(available_cpus)
+ cpu_shortage = False
+ for suite in suites:
+ if suite.cpus_used <= max_vpp_cpus:
+ suite.assign_cpus(free_cpus[: suite.cpus_used])
+ else:
+ suite.assign_cpus([])
+ cpu_shortage = True
+ full_suite.addTests(suites)
+ result = VppTestRunner(
+ verbosity=config.verbose, failfast=config.failfast, print_summary=True
+ ).run(full_suite)
+ was_successful = result.wasSuccessful()
+ if not was_successful:
+ for test_case_info in result.failed_test_cases_info:
+ handle_failed_suite(
+ test_case_info.logger,
+ test_case_info.tempdir,
+ test_case_info.vpp_pid,
+ config.vpp,
+ )
+ if test_case_info in result.core_crash_test_cases_info:
+ check_and_handle_core(
+ test_case_info.vpp_bin_path,
+ test_case_info.tempdir,
+ test_case_info.core_crash_test,
+ )
+
+ if cpu_shortage:
+ print()
+ print(
+ colorize(
+ "SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT"
+ " ENOUGH CPUS AVAILABLE",
+ YELLOW,
+ )
+ )
+ print()
+ sys.exit(not was_successful)
else:
- while True:
- result, failed = run_forked(suite)
- attempts = attempts - 1
- print("%s test(s) failed, %s attempt(s) left" %
- (len(failed), attempts))
- if len(failed) > 0 and attempts > 0:
- suite = suite_from_failed(suite, failed)
- continue
- sys.exit(result)
+ print(
+ "Running each VPPTestCase in a separate background process"
+ f" with at most {max_concurrent_tests} parallel python test "
+ "process(es)"
+ )
+ exit_code = 0
+ while suites and attempts > 0:
+ for suite in suites:
+ failed_link = get_failed_testcase_linkname(
+ config.failed_dir,
+ f"{get_testcase_dirname(suite._tests[0].__class__.__name__)}",
+ )
+ if os.path.islink(failed_link):
+ os.unlink(failed_link)
+ results = run_forked(suites)
+ exit_code, suites = parse_results(results)
+ attempts -= 1
+ if exit_code == 0:
+ print("Test run was successful")
+ else:
+ print("%s attempt(s) left." % attempts)
+ sys.exit(exit_code)