import random
import copy
import psutil
+import platform
from collections import deque
from threading import Thread, Event
from inspect import getdoc, isclass
from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
from scapy.layers.inet6 import ICMPv6EchoReply
+
if os.name == 'posix' and sys.version_info[0] < 3:
# using subprocess32 is recommended by python official documentation
# @ https://docs.python.org/2/library/subprocess.html
else:
import subprocess
-
PASS = 0
FAIL = 1
ERROR = 2
SKIP = 3
TEST_RUN = 4
-
debug_framework = False
if os.getenv('TEST_DEBUG', "0") == "1":
debug_framework = True
import debug_internal
-
"""
Test framework module.
split = read.splitlines(True)
if len(stderr_fragment) > 0:
split[0] = "%s%s" % (stderr_fragment, split[0])
- if len(split) > 0 and split[-1].endswith("\n"):
+ if len(split) > 0 and split[-1].endswith(b"\n"):
limit = None
else:
limit = -1
for line in split[:limit]:
testclass.logger.debug(
"VPP STDERR: %s" % line.rstrip("\n"))
- # ignoring the dummy pipe here intentionally - the flag will take care
- # of properly terminating the loop
+ # ignoring the dummy pipe here intentionally - the
+ # flag will take care of properly terminating the loop
+
+
+def _is_skip_aarch64_set():
+ return os.getenv('SKIP_AARCH64', 'n').lower() in ('yes', 'y', '1')
+is_skip_aarch64_set = _is_skip_aarch64_set()
-def running_extended_tests():
+
+def _is_platform_aarch64():
+ return platform.machine() == 'aarch64'
+
+is_platform_aarch64 = _is_platform_aarch64()
+
+
+def _running_extended_tests():
s = os.getenv("EXTENDED_TESTS", "n")
return True if s.lower() in ("y", "yes", "1") else False
+running_extended_tests = _running_extended_tests()
+
-def running_on_centos():
+def _running_on_centos():
os_id = os.getenv("OS_ID", "")
return True if "centos" in os_id.lower() else False
+running_on_centos = _running_on_centos
+
class KeepAliveReporter(object):
"""
def __init__(self):
self.__dict__ = self._shared_state
+ self._pipe = None
@property
def pipe(self):
@pipe.setter
def pipe(self, pipe):
- if hasattr(self, '_pipe'):
+ if self._pipe is not None:
raise Exception("Internal error - pipe should only be set once.")
self._pipe = pipe
classes. It provides methods to create and run test case.
"""
+ extra_vpp_punt_config = []
+ extra_vpp_plugin_config = []
+
@property
def packet_infos(self):
"""List of packet infos"""
else:
raise Exception("Unrecognized DEBUG option: '%s'" % d)
- @classmethod
- def get_least_used_cpu(self):
+ @staticmethod
+ def get_least_used_cpu():
cpu_usage_list = [set(range(psutil.cpu_count()))]
vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
if 'vpp_main' == p.info['name']]
return random.choice(tuple(min_usage_set))
- @staticmethod
+ @classmethod
def print_header(cls):
if not hasattr(cls, '_header_printed'):
print(double_line_delim)
c = os.getenv("CACHE_OUTPUT", "1")
cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
cls.set_debug_flags(d)
- cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
- cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
+ cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
+ cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
plugin_path = None
if cls.plugin_path is not None:
"{", "socket-name", cls.stats_sock, "}", "plugins",
"{", "plugin", "dpdk_plugin.so", "{", "disable",
"}", "plugin", "unittest_plugin.so", "{", "enable",
- "}", "}", ]
+ "}"] + cls.extra_vpp_plugin_config + ["}", ]
+ if cls.extra_vpp_punt_config is not None:
+ cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
if plugin_path is not None:
cls.vpp_cmdline.extend(["plugin_path", plugin_path])
cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=1)
- except Exception as e:
+ except subprocess.CalledProcessError as e:
cls.logger.critical("Couldn't start vpp: %s" % e)
raise
"""
gc.collect() # run garbage collection first
random.seed()
- cls.print_header(cls)
+ cls.print_header()
cls.logger = get_logger(cls.__name__)
if hasattr(cls, 'parallel_handler'):
cls.logger.addHandler(cls.parallel_handler)
+ cls.logger.propagate = False
cls.tempdir = tempfile.mkdtemp(
prefix='vpp-unittest-%s-' % cls.__name__)
cls.stats_sock = "%s/stats.sock" % cls.tempdir
if hasattr(cls, 'pump_thread_stop_flag'):
cls.pump_thread_stop_flag.set()
if hasattr(cls, 'pump_thread_wakeup_pipe'):
- os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
+ os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
if hasattr(cls, 'pump_thread'):
cls.logger.debug("Waiting for pump thread to stop")
cls.pump_thread.join()
stderr_log(single_line_delim)
stderr_log('VPP output to stderr while running %s:', cls.__name__)
stderr_log(single_line_delim)
- vpp_output = "".join(cls.vpp_stderr_deque)
+ vpp_output = "".join(str(cls.vpp_stderr_deque))
with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
f.write(vpp_output)
stderr_log('\n%s', vpp_output)
for cf in checksum_fields:
if hasattr(layer, cf):
if ignore_zero_udp_checksums and \
- 0 == getattr(layer, cf) and \
- layer.name in udp_layers:
+ 0 == getattr(layer, cf) and \
+ layer.name in udp_layers:
continue
delattr(layer, cf)
checksums.append((counter, cf))
self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
def assert_packet_counter_equal(self, counter, expected_value):
- counters = self.vapi.cli("sh errors").split('\n')
- counter_value = -1
- for i in range(1, len(counters)-1):
- results = counters[i].split()
- if results[1] == counter:
- counter_value = int(results[0])
- break
- self.assert_equal(counter_value, expected_value,
- "packet counter `%s'" % counter)
+ if counter.startswith("/"):
+ counter_value = self.statistics.get_counter(counter)
+ self.assert_equal(counter_value, expected_value,
+ "packet counter `%s'" % counter)
+ else:
+ counters = self.vapi.cli("sh errors").split('\n')
+ counter_value = -1
+ for i in range(1, len(counters) - 1):
+ results = counters[i].split()
+ if results[1] == counter:
+ counter_value = int(results[0])
+ break
@classmethod
def sleep(cls, timeout, remark=None):
if hasattr(cls, 'logger'):
- cls.logger.debug("Starting sleep for %ss (%s)" % (timeout, remark))
+ cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
before = time.time()
time.sleep(timeout)
after = time.time()
- if after - before > 2 * timeout:
+ if hasattr(cls, 'logger') and after - before > 2 * timeout:
cls.logger.error("unexpected time.sleep() result - "
- "slept for %ss instead of ~%ss!" % (
- after - before, timeout))
+ "slept for %es instead of ~%es!",
+ after - before, timeout)
if hasattr(cls, 'logger'):
cls.logger.debug(
- "Finished sleep (%s) - slept %ss (wanted %ss)" % (
- remark, after - before, timeout))
+ "Finished sleep (%s) - slept %es (wanted %es)",
+ remark, after - before, timeout)
def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
self.vapi.cli("clear trace")
core_crash_test_cases_info = set()
current_test_case_info = None
- def __init__(self, stream, descriptions, verbosity):
+ def __init__(self, stream, descriptions, verbosity, runner):
"""
:param stream File descriptor to store where to report test results.
Set to the standard error stream by default.
self.descriptions = descriptions
self.verbosity = verbosity
self.result_string = None
+ self.runner = runner
def addSuccess(self, test):
"""
def symlink_failed(self):
if self.current_test_case_info:
try:
- failed_dir = os.getenv('VPP_TEST_FAILED_DIR')
+ failed_dir = os.getenv('FAILED_DIR')
link_path = os.path.join(
failed_dir,
'%s-FAILED' %
:param test:
"""
- test.print_header(test.__class__)
+ test.print_header()
unittest.TestResult.startTest(self, test)
if self.verbosity > 0:
"""
Print errors from running the test case
"""
- self.stream.writeln()
- self.printErrorList('ERROR', self.errors)
- self.printErrorList('FAIL', self.failures)
+ if len(self.errors) > 0 or len(self.failures) > 0:
+ self.stream.writeln()
+ self.printErrorList('ERROR', self.errors)
+ self.printErrorList('FAIL', self.failures)
+
+ # ^^ that is the last output from unittest before summary
+ if not self.runner.print_summary:
+ devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
+ self.stream = devnull
+ self.runner.stream = devnull
def printErrorList(self, flavour, errors):
"""
"""
A basic test runner implementation which prints results to standard error.
"""
+
@property
def resultclass(self):
"""Class maintaining the results of the tests"""
def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
result_pipe=None, failfast=False, buffer=False,
- resultclass=None):
-
+ resultclass=None, print_summary=True):
# ignore stream setting here, use hard-coded stdout to be in sync
# with prints from VppTestCase methods ...
super(VppTestRunner, self).__init__(sys.stdout, descriptions,
verbosity, failfast, buffer,
resultclass)
- reporter = KeepAliveReporter()
- reporter.pipe = keep_alive_pipe
+ KeepAliveReporter.pipe = keep_alive_pipe
+
+ self.orig_stream = self.stream
+ self.resultclass.test_framework_result_pipe = result_pipe
+
+ self.print_summary = print_summary
- VppTestResult.test_framework_result_pipe = result_pipe
+ def _makeResult(self):
+ return self.resultclass(self.stream,
+ self.descriptions,
+ self.verbosity,
+ self)
def run(self, test):
"""
faulthandler.enable() # emit stack trace to stderr if killed by signal
result = super(VppTestRunner, self).run(test)
+ if not self.print_summary:
+ self.stream = self.orig_stream
+ result.stream = self.orig_stream
return result