import random
import copy
import psutil
+import platform
from collections import deque
from threading import Thread, Event
from inspect import getdoc, isclass
from vpp_papi_provider import VppPapiProvider
from vpp_papi.vpp_stats import VPPStats
from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
- getLogger, colorize
+ get_logger, colorize
from vpp_object import VppObjectRegistry
from util import ppp, is_core_present
from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
from scapy.layers.inet6 import ICMPv6EchoReply
+
if os.name == 'posix' and sys.version_info[0] < 3:
# using subprocess32 is recommended by python official documentation
# @ https://docs.python.org/2/library/subprocess.html
else:
import subprocess
+# Python2/3 compatible
+try:
+ input = raw_input
+except NameError:
+ pass
PASS = 0
FAIL = 1
SKIP = 3
TEST_RUN = 4
-
debug_framework = False
if os.getenv('TEST_DEBUG', "0") == "1":
debug_framework = True
import debug_internal
-
"""
Test framework module.
split = read.splitlines(True)
if len(stderr_fragment) > 0:
split[0] = "%s%s" % (stderr_fragment, split[0])
- if len(split) > 0 and split[-1].endswith("\n"):
+ if len(split) > 0 and split[-1].endswith(b"\n"):
limit = None
else:
limit = -1
for line in split[:limit]:
testclass.logger.debug(
"VPP STDERR: %s" % line.rstrip("\n"))
- # ignoring the dummy pipe here intentionally - the flag will take care
- # of properly terminating the loop
+ # ignoring the dummy pipe here intentionally - the
+ # flag will take care of properly terminating the loop
+
+
+def _is_skip_aarch64_set():
+ return os.getenv('SKIP_AARCH64', 'n').lower() in ('yes', 'y', '1')
+
+is_skip_aarch64_set = _is_skip_aarch64_set()
-def running_extended_tests():
+def _is_platform_aarch64():
+ return platform.machine() == 'aarch64'
+
+is_platform_aarch64 = _is_platform_aarch64()
+
+
+def _running_extended_tests():
s = os.getenv("EXTENDED_TESTS", "n")
return True if s.lower() in ("y", "yes", "1") else False
+running_extended_tests = _running_extended_tests()
-def running_on_centos():
+
+def _running_on_centos():
os_id = os.getenv("OS_ID", "")
return True if "centos" in os_id.lower() else False
+running_on_centos = _running_on_centos
+
class KeepAliveReporter(object):
"""
def __init__(self):
self.__dict__ = self._shared_state
+ self._pipe = None
@property
def pipe(self):
@pipe.setter
def pipe(self, pipe):
- if hasattr(self, '_pipe'):
+ if self._pipe is not None:
raise Exception("Internal error - pipe should only be set once.")
self._pipe = pipe
classes. It provides methods to create and run test case.
"""
+ extra_vpp_punt_config = []
+ extra_vpp_plugin_config = []
+
@property
def packet_infos(self):
"""List of packet infos"""
else:
raise Exception("Unrecognized DEBUG option: '%s'" % d)
- @classmethod
- def get_least_used_cpu(self):
+ @staticmethod
+ def get_least_used_cpu():
cpu_usage_list = [set(range(psutil.cpu_count()))]
vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
if 'vpp_main' == p.info['name']]
return random.choice(tuple(min_usage_set))
- @staticmethod
+ @classmethod
def print_header(cls):
if not hasattr(cls, '_header_printed'):
print(double_line_delim)
c = os.getenv("CACHE_OUTPUT", "1")
cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
cls.set_debug_flags(d)
- cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
- cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
+ cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
+ cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
plugin_path = None
if cls.plugin_path is not None:
"{", "socket-name", cls.stats_sock, "}", "plugins",
"{", "plugin", "dpdk_plugin.so", "{", "disable",
"}", "plugin", "unittest_plugin.so", "{", "enable",
- "}", "}", ]
+ "}"] + cls.extra_vpp_plugin_config + ["}", ]
+ if cls.extra_vpp_punt_config is not None:
+ cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
if plugin_path is not None:
cls.vpp_cmdline.extend(["plugin_path", plugin_path])
cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
print("Now is the time to attach a gdb by running the above "
"command and set up breakpoints etc.")
print(single_line_delim)
- raw_input("Press ENTER to continue running the testcase...")
+ input("Press ENTER to continue running the testcase...")
@classmethod
def run_vpp(cls):
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=1)
- except Exception as e:
+ except subprocess.CalledProcessError as e:
cls.logger.critical("Couldn't start vpp: %s" % e)
raise
@classmethod
def wait_for_stats_socket(cls):
deadline = time.time() + 3
- while time.time() < deadline or cls.debug_gdb or cls.debug_gdbserver:
+ ok = False
+ while time.time() < deadline or \
+ cls.debug_gdb or cls.debug_gdbserver:
if os.path.exists(cls.stats_sock):
+ ok = True
break
+ time.sleep(0.8)
+ if not ok:
+ cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
@classmethod
def setUpClass(cls):
"""
gc.collect() # run garbage collection first
random.seed()
- cls.print_header(cls)
- if not hasattr(cls, 'logger'):
- cls.logger = getLogger(cls.__name__)
- else:
- cls.logger.name = cls.__name__
+ cls.print_header()
+ cls.logger = get_logger(cls.__name__)
+ if hasattr(cls, 'parallel_handler'):
+ cls.logger.addHandler(cls.parallel_handler)
+ cls.logger.propagate = False
cls.tempdir = tempfile.mkdtemp(
prefix='vpp-unittest-%s-' % cls.__name__)
cls.stats_sock = "%s/stats.sock" % cls.tempdir
print(double_line_delim)
print("VPP or GDB server is still running")
print(single_line_delim)
- raw_input("When done debugging, press ENTER to kill the "
- "process and finish running the testcase...")
+ input("When done debugging, press ENTER to kill the "
+ "process and finish running the testcase...")
# first signal that we want to stop the pump thread, then wake it up
if hasattr(cls, 'pump_thread_stop_flag'):
cls.pump_thread_stop_flag.set()
if hasattr(cls, 'pump_thread_wakeup_pipe'):
- os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
+ os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
if hasattr(cls, 'pump_thread'):
cls.logger.debug("Waiting for pump thread to stop")
cls.pump_thread.join()
stderr_log(single_line_delim)
stderr_log('VPP output to stderr while running %s:', cls.__name__)
stderr_log(single_line_delim)
- vpp_output = "".join(cls.vpp_stderr_deque)
+ vpp_output = "".join(str(cls.vpp_stderr_deque))
with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
f.write(vpp_output)
stderr_log('\n%s', vpp_output)
for cf in checksum_fields:
if hasattr(layer, cf):
if ignore_zero_udp_checksums and \
- 0 == getattr(layer, cf) and \
- layer.name in udp_layers:
+ 0 == getattr(layer, cf) and \
+ layer.name in udp_layers:
continue
delattr(layer, cf)
checksums.append((counter, cf))
self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
def assert_packet_counter_equal(self, counter, expected_value):
- counters = self.vapi.cli("sh errors").split('\n')
- counter_value = -1
- for i in range(1, len(counters)-1):
- results = counters[i].split()
- if results[1] == counter:
- counter_value = int(results[0])
- break
- self.assert_equal(counter_value, expected_value,
- "packet counter `%s'" % counter)
+ if counter.startswith("/"):
+ counter_value = self.statistics.get_counter(counter)
+ self.assert_equal(counter_value, expected_value,
+ "packet counter `%s'" % counter)
+ else:
+ counters = self.vapi.cli("sh errors").split('\n')
+ counter_value = -1
+ for i in range(1, len(counters) - 1):
+ results = counters[i].split()
+ if results[1] == counter:
+ counter_value = int(results[0])
+ break
@classmethod
def sleep(cls, timeout, remark=None):
if hasattr(cls, 'logger'):
- cls.logger.debug("Starting sleep for %ss (%s)" % (timeout, remark))
+ cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
before = time.time()
time.sleep(timeout)
after = time.time()
- if after - before > 2 * timeout:
+ if hasattr(cls, 'logger') and after - before > 2 * timeout:
cls.logger.error("unexpected time.sleep() result - "
- "slept for %ss instead of ~%ss!" % (
- after - before, timeout))
+ "slept for %es instead of ~%es!",
+ after - before, timeout)
if hasattr(cls, 'logger'):
cls.logger.debug(
- "Finished sleep (%s) - slept %ss (wanted %ss)" % (
- remark, after - before, timeout))
+ "Finished sleep (%s) - slept %es (wanted %es)",
+ remark, after - before, timeout)
def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
self.vapi.cli("clear trace")
core_crash_test_cases_info = set()
current_test_case_info = None
- def __init__(self, stream, descriptions, verbosity):
+ def __init__(self, stream, descriptions, verbosity, runner):
"""
:param stream File descriptor to store where to report test results.
Set to the standard error stream by default.
self.descriptions = descriptions
self.verbosity = verbosity
self.result_string = None
+ self.runner = runner
def addSuccess(self, test):
"""
def symlink_failed(self):
if self.current_test_case_info:
try:
- failed_dir = os.getenv('VPP_TEST_FAILED_DIR')
+ failed_dir = os.getenv('FAILED_DIR')
link_path = os.path.join(
failed_dir,
'%s-FAILED' %
:param test:
"""
- test.print_header(test.__class__)
+ test.print_header()
unittest.TestResult.startTest(self, test)
if self.verbosity > 0:
"""
Print errors from running the test case
"""
- self.stream.writeln()
- self.printErrorList('ERROR', self.errors)
- self.printErrorList('FAIL', self.failures)
+ if len(self.errors) > 0 or len(self.failures) > 0:
+ self.stream.writeln()
+ self.printErrorList('ERROR', self.errors)
+ self.printErrorList('FAIL', self.failures)
+
+ # ^^ that is the last output from unittest before summary
+ if not self.runner.print_summary:
+ devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
+ self.stream = devnull
+ self.runner.stream = devnull
def printErrorList(self, flavour, errors):
"""
"""
A basic test runner implementation which prints results to standard error.
"""
+
@property
def resultclass(self):
"""Class maintaining the results of the tests"""
def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
result_pipe=None, failfast=False, buffer=False,
- resultclass=None):
-
+ resultclass=None, print_summary=True):
# ignore stream setting here, use hard-coded stdout to be in sync
# with prints from VppTestCase methods ...
super(VppTestRunner, self).__init__(sys.stdout, descriptions,
verbosity, failfast, buffer,
resultclass)
- reporter = KeepAliveReporter()
- reporter.pipe = keep_alive_pipe
+ KeepAliveReporter.pipe = keep_alive_pipe
+
+ self.orig_stream = self.stream
+ self.resultclass.test_framework_result_pipe = result_pipe
+
+ self.print_summary = print_summary
- VppTestResult.test_framework_result_pipe = result_pipe
+ def _makeResult(self):
+ return self.resultclass(self.stream,
+ self.descriptions,
+ self.verbosity,
+ self)
def run(self, test):
"""
faulthandler.enable() # emit stack trace to stderr if killed by signal
result = super(VppTestRunner, self).run(test)
+ if not self.print_summary:
+ self.stream = self.orig_stream
+ result.stream = self.orig_stream
return result