import random
import copy
import psutil
+import platform
from collections import deque
from threading import Thread, Event
from inspect import getdoc, isclass
from logging import FileHandler, DEBUG, Formatter
from scapy.packet import Raw
from hook import StepHook, PollHook, VppDiedError
-from vpp_pg_interface import VppPGInterface
+from vpp_config import VppTestCaseVppConfig
+from vpp_interface import VppInterface
from vpp_sub_interface import VppSubInterface
+from vpp_pg_interface import VppPGInterface
from vpp_lo_interface import VppLoInterface
from vpp_papi_provider import VppPapiProvider
+from vpp_papi.vpp_stats import VPPStats
from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
- getLogger, colorize
+ get_logger, colorize
from vpp_object import VppObjectRegistry
-from util import ppp
+from util import ppp, is_core_present
from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
from scapy.layers.inet6 import ICMPv6EchoReply
+
if os.name == 'posix' and sys.version_info[0] < 3:
# using subprocess32 is recommended by python official documentation
# @ https://docs.python.org/2/library/subprocess.html
else:
import subprocess
+# Python2/3 compatible
+try:
+ input = raw_input
+except NameError:
+ pass
+
+PASS = 0
+FAIL = 1
+ERROR = 2
+SKIP = 3
+TEST_RUN = 4
debug_framework = False
if os.getenv('TEST_DEBUG', "0") == "1":
debug_framework = True
import debug_internal
-
"""
Test framework module.
split = read.splitlines(True)
if len(stderr_fragment) > 0:
split[0] = "%s%s" % (stderr_fragment, split[0])
- if len(split) > 0 and split[-1].endswith("\n"):
+ if len(split) > 0 and split[-1].endswith(b"\n"):
limit = None
else:
limit = -1
for line in split[:limit]:
testclass.logger.debug(
"VPP STDERR: %s" % line.rstrip("\n"))
- # ignoring the dummy pipe here intentionally - the flag will take care
- # of properly terminating the loop
+ # ignoring the dummy pipe here intentionally - the
+ # flag will take care of properly terminating the loop
+
+
+def _is_skip_aarch64_set():
+ return os.getenv('SKIP_AARCH64', 'n').lower() in ('yes', 'y', '1')
+
+is_skip_aarch64_set = _is_skip_aarch64_set()
-def running_extended_tests():
+def _is_platform_aarch64():
+ return platform.machine() == 'aarch64'
+
+is_platform_aarch64 = _is_platform_aarch64()
+
+
+def _running_extended_tests():
s = os.getenv("EXTENDED_TESTS", "n")
return True if s.lower() in ("y", "yes", "1") else False
+running_extended_tests = _running_extended_tests()
-def running_on_centos():
+
+def _running_on_centos():
os_id = os.getenv("OS_ID", "")
return True if "centos" in os_id.lower() else False
+running_on_centos = _running_on_centos
+
class KeepAliveReporter(object):
"""
def __init__(self):
self.__dict__ = self._shared_state
+ self._pipe = None
@property
def pipe(self):
@pipe.setter
def pipe(self, pipe):
- if hasattr(self, '_pipe'):
+ if self._pipe is not None:
raise Exception("Internal error - pipe should only be set once.")
self._pipe = pipe
- def send_keep_alive(self, test):
+ def send_keep_alive(self, test, desc=None):
"""
Write current test tmpdir & desc to keep-alive pipe to signal liveness
"""
return
if isclass(test):
- desc = test.__name__
+ desc = '%s (%s)' % (desc, unittest.util.strclass(test))
else:
- desc = test.shortDescription()
- if not desc:
- desc = str(test)
+ desc = test.id()
self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
classes. It provides methods to create and run test case.
"""
+ CLI_LISTEN_DEFAULT = 'localhost:5002'
+ config = VppTestCaseVppConfig()
+
@property
def packet_infos(self):
"""List of packet infos"""
else:
raise Exception("Unrecognized DEBUG option: '%s'" % d)
- @classmethod
- def get_least_used_cpu(self):
+ @staticmethod
+ def get_least_used_cpu():
cpu_usage_list = [set(range(psutil.cpu_count()))]
vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
if 'vpp_main' == p.info['name']]
return random.choice(tuple(min_usage_set))
+ @classmethod
+ def print_header(cls):
+ if not hasattr(cls, '_header_printed'):
+ print(double_line_delim)
+ print(colorize(getdoc(cls).splitlines()[0], GREEN))
+ print(double_line_delim)
+ cls._header_printed = True
+
@classmethod
def setUpConstants(cls):
""" Set-up the test case class based on environment variables """
c = os.getenv("CACHE_OUTPUT", "1")
cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
cls.set_debug_flags(d)
- cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
- cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
+ cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
+ cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
plugin_path = None
if cls.plugin_path is not None:
plugin_path = cls.plugin_path
elif cls.extern_plugin_path is not None:
plugin_path = cls.extern_plugin_path
- debug_cli = ""
+
if cls.step or cls.debug_gdb or cls.debug_gdbserver:
- debug_cli = "cli-listen localhost:5002"
+ cls.config.add('unix', 'cli-listen', cls.CLI_LISTEN_DEFAULT)
+
coredump_size = None
size = os.getenv("COREDUMP_SIZE")
- if size is not None:
- coredump_size = "coredump-size %s" % size
- if coredump_size is None:
- coredump_size = "coredump-size unlimited"
-
- cpu_core_number = cls.get_least_used_cpu()
-
- cls.vpp_cmdline = [cls.vpp_bin, "unix",
- "{", "nodaemon", debug_cli, "full-coredump",
- coredump_size, "}", "api-trace", "{", "on", "}",
- "api-segment", "{", "prefix", cls.shm_prefix, "}",
- "cpu", "{", "main-core", str(cpu_core_number), "}",
- "plugins", "{", "plugin", "dpdk_plugin.so", "{",
- "disable", "}", "plugin", "unittest_plugin.so",
- "{", "enable", "}", "}", ]
+ cls.config.add('unix', 'coredump-size',
+ size if size is not None else 'unlimited')
+
+ cls.config.add('unix', 'runtime-dir', cls.tempdir)
+ cls.config.add('api-segment', 'prefix', cls.shm_prefix)
+ cls.config.add('cpu', 'main-core', str(cls.get_least_used_cpu()))
+ cls.config.add('statseg', 'socket-name', cls.stats_sock)
+
if plugin_path is not None:
- cls.vpp_cmdline.extend(["plugin_path", plugin_path])
- cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
+ cls.config.add('plugins', 'path', plugin_path)
+ cls.config.add_plugin('dpdk_plugin.so', 'disable')
+ cls.config.add_plugin('unittest_plugin.so', 'enable')
+
+ cls.vpp_cmdline = [cls.vpp_bin] + cls.config.shlex()
+ cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
+ cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
@classmethod
def wait_for_enter(cls):
print("Now is the time to attach a gdb by running the above "
"command and set up breakpoints etc.")
print(single_line_delim)
- raw_input("Press ENTER to continue running the testcase...")
+ input("Press ENTER to continue running the testcase...")
@classmethod
def run_vpp(cls):
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=1)
- except Exception as e:
+ except subprocess.CalledProcessError as e:
cls.logger.critical("Couldn't start vpp: %s" % e)
raise
cls.wait_for_enter()
+ @classmethod
+ def wait_for_stats_socket(cls):
+ deadline = time.time() + 3
+ ok = False
+ while time.time() < deadline or \
+ cls.debug_gdb or cls.debug_gdbserver:
+ if os.path.exists(cls.stats_sock):
+ ok = True
+ break
+ time.sleep(0.8)
+ if not ok:
+ cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
+
@classmethod
def setUpClass(cls):
"""
"""
gc.collect() # run garbage collection first
random.seed()
- if not hasattr(cls, 'logger'):
- cls.logger = getLogger(cls.__name__)
- else:
- cls.logger.name = cls.__name__
+ cls.print_header()
+ cls.logger = get_logger(cls.__name__)
+ if hasattr(cls, 'parallel_handler'):
+ cls.logger.addHandler(cls.parallel_handler)
+ cls.logger.propagate = False
cls.tempdir = tempfile.mkdtemp(
prefix='vpp-unittest-%s-' % cls.__name__)
+ cls.stats_sock = "%s/stats.sock" % cls.tempdir
cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
cls.file_handler.setFormatter(
Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
# doesn't get called and we might end with a zombie vpp
try:
cls.run_vpp()
- cls.reporter.send_keep_alive(cls)
+ cls.reporter.send_keep_alive(cls, 'setUpClass')
+ VppTestResult.current_test_case_info = TestCaseInfo(
+ cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
cls.vpp_stdout_deque = deque()
cls.vpp_stderr_deque = deque()
cls.pump_thread_stop_flag = Event()
cls.pump_thread = Thread(target=pump_output, args=(cls,))
cls.pump_thread.daemon = True
cls.pump_thread.start()
- cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls)
+ if cls.debug_gdb or cls.debug_gdbserver:
+ read_timeout = 0
+ else:
+ read_timeout = 5
+ cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
+ read_timeout)
if cls.step:
hook = StepHook(cls)
else:
hook = PollHook(cls)
cls.vapi.register_hook(hook)
- cls.sleep(0.1, "after vpp startup, before initial poll")
+ cls.wait_for_stats_socket()
+ cls.statistics = VPPStats(socketname=cls.stats_sock)
try:
hook.poll_vpp()
except VppDiedError:
print(double_line_delim)
print("VPP or GDB server is still running")
print(single_line_delim)
- raw_input("When done debugging, press ENTER to kill the "
- "process and finish running the testcase...")
+ input("When done debugging, press ENTER to kill the "
+ "process and finish running the testcase...")
# first signal that we want to stop the pump thread, then wake it up
if hasattr(cls, 'pump_thread_stop_flag'):
cls.pump_thread_stop_flag.set()
if hasattr(cls, 'pump_thread_wakeup_pipe'):
- os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
+ os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
if hasattr(cls, 'pump_thread'):
cls.logger.debug("Waiting for pump thread to stop")
cls.pump_thread.join()
cls.vpp.poll()
if cls.vpp.returncode is None:
cls.logger.debug("Sending TERM to vpp")
- cls.vpp.terminate()
+ cls.vpp.kill()
cls.logger.debug("Waiting for vpp to die")
cls.vpp.communicate()
del cls.vpp
stderr_log(single_line_delim)
stderr_log('VPP output to stderr while running %s:', cls.__name__)
stderr_log(single_line_delim)
- vpp_output = "".join(cls.vpp_stderr_deque)
+ vpp_output = "".join(str(cls.vpp_stderr_deque))
with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
f.write(vpp_output)
stderr_log('\n%s', vpp_output)
@classmethod
def tearDownClass(cls):
""" Perform final cleanup after running all tests in this test-case """
+ cls.reporter.send_keep_alive(cls, 'tearDownClass')
cls.quit()
cls.file_handler.close()
cls.reset_packet_infos()
self.logger.debug(self.vapi.cli("show trace"))
self.logger.info(self.vapi.ppcli("show interface"))
self.logger.info(self.vapi.ppcli("show hardware"))
- self.logger.info(self.vapi.ppcli("show error"))
+ self.logger.info(self.statistics.set_errors_str())
self.logger.info(self.vapi.ppcli("show run"))
self.logger.info(self.vapi.ppcli("show log"))
self.registry.remove_vpp_config(self.logger)
if pkt.haslayer(ICMPv6EchoReply):
self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
+ def assert_packet_counter_equal(self, counter, expected_value):
+ if counter.startswith("/"):
+ counter_value = self.statistics.get_counter(counter)
+ self.assert_equal(counter_value, expected_value,
+ "packet counter `%s'" % counter)
+ else:
+ counters = self.vapi.cli("sh errors").split('\n')
+ counter_value = -1
+ for i in range(1, len(counters) - 1):
+ results = counters[i].split()
+ if results[1] == counter:
+ counter_value = int(results[0])
+ break
+
@classmethod
def sleep(cls, timeout, remark=None):
if hasattr(cls, 'logger'):
- cls.logger.debug("Starting sleep for %ss (%s)" % (timeout, remark))
+ cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
before = time.time()
time.sleep(timeout)
after = time.time()
- if after - before > 2 * timeout:
+ if hasattr(cls, 'logger') and after - before > 2 * timeout:
cls.logger.error("unexpected time.sleep() result - "
- "slept for %ss instead of ~%ss!" % (
- after - before, timeout))
+ "slept for %es instead of ~%es!",
+ after - before, timeout)
if hasattr(cls, 'logger'):
cls.logger.debug(
- "Finished sleep (%s) - slept %ss (wanted %ss)" % (
- remark, after - before, timeout))
+ "Finished sleep (%s) - slept %es (wanted %es)",
+ remark, after - before, timeout)
def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
self.vapi.cli("clear trace")
input.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- rx = output.get_capture(len(pkts))
+ if isinstance(object, (list,)):
+ rx = []
+ for o in output:
+ rx.append(output.get_capture(len(pkts)))
+ else:
+ rx = output.get_capture(len(pkts))
+ return rx
+
+ def send_and_expect_only(self, input, pkts, output, timeout=None):
+ self.vapi.cli("clear trace")
+ input.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ if isinstance(object, (list,)):
+ outputs = output
+ rx = []
+ for o in outputs:
+ rx.append(output.get_capture(len(pkts)))
+ else:
+ rx = output.get_capture(len(pkts))
+ outputs = [output]
+ if not timeout:
+ timeout = 1
+ for i in self.pg_interfaces:
+ if i not in outputs:
+ i.get_capture(0, timeout=timeout)
+ i.assert_nothing_captured()
+ timeout = 0.1
+
return rx
def get_test_description(descriptions, test):
- # TODO: if none print warning not raise exception
short_description = test.shortDescription()
if descriptions and short_description:
return short_description
return str(test)
-class TestCasePrinter(object):
- _shared_state = {}
-
- def __init__(self):
- self.__dict__ = self._shared_state
- if not hasattr(self, "_test_case_set"):
- self._test_case_set = set()
-
- def print_test_case_heading_if_first_time(self, case):
- if case.__class__ not in self._test_case_set:
- print(double_line_delim)
- print(colorize(get_testcase_doc_name(case), GREEN))
- print(double_line_delim)
- self._test_case_set.add(case.__class__)
+class TestCaseInfo(object):
+ def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
+ self.logger = logger
+ self.tempdir = tempdir
+ self.vpp_pid = vpp_pid
+ self.vpp_bin_path = vpp_bin_path
+ self.core_crash_test = None
class VppTestResult(unittest.TestResult):
methods.
"""
- def __init__(self, stream, descriptions, verbosity):
+ failed_test_cases_info = set()
+ core_crash_test_cases_info = set()
+ current_test_case_info = None
+
+ def __init__(self, stream, descriptions, verbosity, runner):
"""
:param stream File descriptor to store where to report test results.
Set to the standard error stream by default.
self.descriptions = descriptions
self.verbosity = verbosity
self.result_string = None
- self.printer = TestCasePrinter()
- self.passed = 0
+ self.runner = runner
def addSuccess(self, test):
"""
:param test:
"""
- if hasattr(test, 'logger'):
- test.logger.debug("--- addSuccess() %s.%s(%s) called"
- % (test.__class__.__name__,
- test._testMethodName,
- test._testMethodDoc))
- self.passed += 1
+ if self.current_test_case_info:
+ self.current_test_case_info.logger.debug(
+ "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
+ test._testMethodName,
+ test._testMethodDoc))
unittest.TestResult.addSuccess(self, test)
self.result_string = colorize("OK", GREEN)
+ self.send_result_through_pipe(test, PASS)
+
def addSkip(self, test, reason):
"""
Record a test skipped.
:param reason:
"""
- if hasattr(test, 'logger'):
- test.logger.debug("--- addSkip() %s.%s(%s) called, reason is %s"
- % (test.__class__.__name__,
- test._testMethodName,
- test._testMethodDoc,
- reason))
+ if self.current_test_case_info:
+ self.current_test_case_info.logger.debug(
+ "--- addSkip() %s.%s(%s) called, reason is %s" %
+ (test.__class__.__name__, test._testMethodName,
+ test._testMethodDoc, reason))
unittest.TestResult.addSkip(self, test, reason)
self.result_string = colorize("SKIP", YELLOW)
- def symlink_failed(self, test):
- logger = None
- if hasattr(test, 'logger'):
- logger = test.logger
- if hasattr(test, 'tempdir'):
+ self.send_result_through_pipe(test, SKIP)
+
+ def symlink_failed(self):
+ if self.current_test_case_info:
try:
- failed_dir = os.getenv('VPP_TEST_FAILED_DIR')
- link_path = os.path.join(failed_dir, '%s-FAILED' %
- os.path.basename(test.tempdir))
- if logger:
- logger.debug("creating a link to the failed test")
- logger.debug("os.symlink(%s, %s)" %
- (test.tempdir, link_path))
+ failed_dir = os.getenv('FAILED_DIR')
+ link_path = os.path.join(
+ failed_dir,
+ '%s-FAILED' %
+ os.path.basename(self.current_test_case_info.tempdir))
+ if self.current_test_case_info.logger:
+ self.current_test_case_info.logger.debug(
+ "creating a link to the failed test")
+ self.current_test_case_info.logger.debug(
+ "os.symlink(%s, %s)" %
+ (self.current_test_case_info.tempdir, link_path))
if os.path.exists(link_path):
- if logger:
- logger.debug('symlink already exists')
+ if self.current_test_case_info.logger:
+ self.current_test_case_info.logger.debug(
+ 'symlink already exists')
else:
- os.symlink(test.tempdir, link_path)
+ os.symlink(self.current_test_case_info.tempdir, link_path)
except Exception as e:
- if logger:
- logger.error(e)
+ if self.current_test_case_info.logger:
+ self.current_test_case_info.logger.error(e)
- def send_results_through_pipe(self):
- if hasattr(self, 'test_framework_results_pipe'):
- pipe = self.test_framework_results_pipe
+ def send_result_through_pipe(self, test, result):
+ if hasattr(self, 'test_framework_result_pipe'):
+ pipe = self.test_framework_result_pipe
if pipe:
- pipe.send(self)
+ pipe.send((test.id(), result))
+
+ def log_error(self, test, err, fn_name):
+ if self.current_test_case_info:
+ if isinstance(test, unittest.suite._ErrorHolder):
+ test_name = test.description
+ else:
+ test_name = '%s.%s(%s)' % (test.__class__.__name__,
+ test._testMethodName,
+ test._testMethodDoc)
+ self.current_test_case_info.logger.debug(
+ "--- %s() %s called, err is %s" %
+ (fn_name, test_name, err))
+ self.current_test_case_info.logger.debug(
+ "formatted exception is:\n%s" %
+ "".join(format_exception(*err)))
+
+ def add_error(self, test, err, unittest_fn, error_type):
+ if error_type == FAIL:
+ self.log_error(test, err, 'addFailure')
+ error_type_str = colorize("FAIL", RED)
+ elif error_type == ERROR:
+ self.log_error(test, err, 'addError')
+ error_type_str = colorize("ERROR", RED)
+ else:
+ raise Exception('Error type %s cannot be used to record an '
+ 'error or a failure' % error_type)
+
+ unittest_fn(self, test, err)
+ if self.current_test_case_info:
+ self.result_string = "%s [ temp dir used by test case: %s ]" % \
+ (error_type_str,
+ self.current_test_case_info.tempdir)
+ self.symlink_failed()
+ self.failed_test_cases_info.add(self.current_test_case_info)
+ if is_core_present(self.current_test_case_info.tempdir):
+ if not self.current_test_case_info.core_crash_test:
+ if isinstance(test, unittest.suite._ErrorHolder):
+ test_name = str(test)
+ else:
+ test_name = "'{}' ({})".format(
+ get_testcase_doc_name(test), test.id())
+ self.current_test_case_info.core_crash_test = test_name
+ self.core_crash_test_cases_info.add(
+ self.current_test_case_info)
+ else:
+ self.result_string = '%s [no temp dir]' % error_type_str
+
+ self.send_result_through_pipe(test, error_type)
def addFailure(self, test, err):
"""
:param err: error message
"""
- if hasattr(test, 'logger'):
- test.logger.debug("--- addFailure() %s.%s(%s) called, err is %s"
- % (test.__class__.__name__,
- test._testMethodName,
- test._testMethodDoc, err))
- test.logger.debug("formatted exception is:\n%s" %
- "".join(format_exception(*err)))
- unittest.TestResult.addFailure(self, test, err)
- if hasattr(test, 'tempdir'):
- self.result_string = colorize("FAIL", RED) + \
- ' [ temp dir used by test case: ' + test.tempdir + ' ]'
- self.symlink_failed(test)
- else:
- self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
+ self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
def addError(self, test, err):
"""
:param err: error message
"""
- if hasattr(test, 'logger'):
- test.logger.debug("--- addError() %s.%s(%s) called, err is %s"
- % (test.__class__.__name__,
- test._testMethodName,
- test._testMethodDoc, err))
- test.logger.debug("formatted exception is:\n%s" %
- "".join(format_exception(*err)))
- unittest.TestResult.addError(self, test, err)
- if hasattr(test, 'tempdir'):
- self.result_string = colorize("ERROR", RED) + \
- ' [ temp dir used by test case: ' + test.tempdir + ' ]'
- self.symlink_failed(test)
- else:
- self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
+ self.add_error(test, err, unittest.TestResult.addError, ERROR)
def getDescription(self, test):
"""
:param test:
"""
- self.printer.print_test_case_heading_if_first_time(test)
+ test.print_header()
+
unittest.TestResult.startTest(self, test)
if self.verbosity > 0:
self.stream.writeln(
else:
self.stream.writeln("%-73s%s" % (self.getDescription(test),
self.result_string))
- self.send_results_through_pipe()
+
+ self.send_result_through_pipe(test, TEST_RUN)
def printErrors(self):
"""
Print errors from running the test case
"""
- self.stream.writeln()
- self.printErrorList('ERROR', self.errors)
- self.printErrorList('FAIL', self.failures)
+ if len(self.errors) > 0 or len(self.failures) > 0:
+ self.stream.writeln()
+ self.printErrorList('ERROR', self.errors)
+ self.printErrorList('FAIL', self.failures)
+
+ # ^^ that is the last output from unittest before summary
+ if not self.runner.print_summary:
+ devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
+ self.stream = devnull
+ self.runner.stream = devnull
def printErrorList(self, flavour, errors):
"""
"""
A basic test runner implementation which prints results to standard error.
"""
+
@property
def resultclass(self):
"""Class maintaining the results of the tests"""
return VppTestResult
def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
- results_pipe=None, failfast=False, buffer=False,
- resultclass=None):
+ result_pipe=None, failfast=False, buffer=False,
+ resultclass=None, print_summary=True):
# ignore stream setting here, use hard-coded stdout to be in sync
# with prints from VppTestCase methods ...
super(VppTestRunner, self).__init__(sys.stdout, descriptions,
verbosity, failfast, buffer,
resultclass)
- reporter = KeepAliveReporter()
- reporter.pipe = keep_alive_pipe
+ KeepAliveReporter.pipe = keep_alive_pipe
+
+ self.orig_stream = self.stream
+ self.resultclass.test_framework_result_pipe = result_pipe
+
+ self.print_summary = print_summary
- VppTestResult.test_framework_results_pipe = results_pipe
+ def _makeResult(self):
+ return self.resultclass(self.stream,
+ self.descriptions,
+ self.verbosity,
+ self)
def run(self, test):
"""
faulthandler.enable() # emit stack trace to stderr if killed by signal
result = super(VppTestRunner, self).run(test)
+ if not self.print_summary:
+ self.stream = self.orig_stream
+ result.stream = self.orig_stream
return result