import random
import copy
import psutil
+import platform
from collections import deque
from threading import Thread, Event
from inspect import getdoc, isclass
from traceback import format_exception
from logging import FileHandler, DEBUG, Formatter
+
+import scapy.compat
from scapy.packet import Raw
from hook import StepHook, PollHook, VppDiedError
from vpp_pg_interface import VppPGInterface
from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
from scapy.layers.inet6 import ICMPv6EchoReply
+
if os.name == 'posix' and sys.version_info[0] < 3:
# using subprocess32 is recommended by python official documentation
# @ https://docs.python.org/2/library/subprocess.html
else:
import subprocess
+# Python2/3 compatible
+try:
+ input = raw_input
+except NameError:
+ pass
PASS = 0
FAIL = 1
SKIP = 3
TEST_RUN = 4
-
debug_framework = False
if os.getenv('TEST_DEBUG', "0") == "1":
debug_framework = True
import debug_internal
-
"""
Test framework module.
split = read.splitlines(True)
if len(stderr_fragment) > 0:
split[0] = "%s%s" % (stderr_fragment, split[0])
- if len(split) > 0 and split[-1].endswith("\n"):
+ if len(split) > 0 and split[-1].endswith(b"\n"):
limit = None
else:
limit = -1
for line in split[:limit]:
testclass.logger.debug(
"VPP STDERR: %s" % line.rstrip("\n"))
- # ignoring the dummy pipe here intentionally - the flag will take care
- # of properly terminating the loop
+ # ignoring the dummy pipe here intentionally - the
+ # flag will take care of properly terminating the loop
+
+def _is_skip_aarch64_set():
+ return os.getenv('SKIP_AARCH64', 'n').lower() in ('yes', 'y', '1')
-def running_extended_tests():
+is_skip_aarch64_set = _is_skip_aarch64_set()
+
+
+def _is_platform_aarch64():
+ return platform.machine() == 'aarch64'
+
+is_platform_aarch64 = _is_platform_aarch64()
+
+
+def _running_extended_tests():
s = os.getenv("EXTENDED_TESTS", "n")
return True if s.lower() in ("y", "yes", "1") else False
+running_extended_tests = _running_extended_tests()
+
-def running_on_centos():
+def _running_on_centos():
os_id = os.getenv("OS_ID", "")
return True if "centos" in os_id.lower() else False
+running_on_centos = _running_on_centos
+
class KeepAliveReporter(object):
"""
classes. It provides methods to create and run test case.
"""
+ extra_vpp_punt_config = []
+ extra_vpp_plugin_config = []
+
@property
def packet_infos(self):
"""List of packet infos"""
return random.choice(tuple(min_usage_set))
- @classmethod
- def print_header(cls):
- if not hasattr(cls, '_header_printed'):
- print(double_line_delim)
- print(colorize(getdoc(cls).splitlines()[0], GREEN))
- print(double_line_delim)
- cls._header_printed = True
-
@classmethod
def setUpConstants(cls):
""" Set-up the test case class based on environment variables """
c = os.getenv("CACHE_OUTPUT", "1")
cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
cls.set_debug_flags(d)
- cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
- cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
+ cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
+ cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
plugin_path = None
if cls.plugin_path is not None:
"{", "socket-name", cls.stats_sock, "}", "plugins",
"{", "plugin", "dpdk_plugin.so", "{", "disable",
"}", "plugin", "unittest_plugin.so", "{", "enable",
- "}", "}", ]
+ "}"] + cls.extra_vpp_plugin_config + ["}", ]
+ if cls.extra_vpp_punt_config is not None:
+ cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
if plugin_path is not None:
cls.vpp_cmdline.extend(["plugin_path", plugin_path])
cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
print("Now is the time to attach a gdb by running the above "
"command and set up breakpoints etc.")
print(single_line_delim)
- raw_input("Press ENTER to continue running the testcase...")
+ input("Press ENTER to continue running the testcase...")
@classmethod
def run_vpp(cls):
stderr=subprocess.PIPE,
bufsize=1)
except subprocess.CalledProcessError as e:
- cls.logger.critical("Couldn't start vpp: %s" % e)
+ cls.logger.critical("Subprocess returned with non-0 return code: ("
+ "%s)", e.returncode)
+ raise
+ except OSError as e:
+ cls.logger.critical("Subprocess returned with OS error: "
+ "(%s) %s", e.errno, e.strerror)
+ raise
+ except Exception as e:
+ cls.logger.exception("Subprocess returned unexpected from "
+ "%s:", cmdline)
raise
cls.wait_for_enter()
if os.path.exists(cls.stats_sock):
ok = True
break
- time.sleep(0.8)
+ cls.sleep(0.8)
if not ok:
cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
Perform class setup before running the testcase
Remove shared memory files, start vpp and connect the vpp-api
"""
+ super(VppTestCase, cls).setUpClass()
gc.collect() # run garbage collection first
random.seed()
- cls.print_header()
cls.logger = get_logger(cls.__name__)
if hasattr(cls, 'parallel_handler'):
cls.logger.addHandler(cls.parallel_handler)
print(double_line_delim)
print("VPP or GDB server is still running")
print(single_line_delim)
- raw_input("When done debugging, press ENTER to kill the "
- "process and finish running the testcase...")
+ input("When done debugging, press ENTER to kill the "
+ "process and finish running the testcase...")
# first signal that we want to stop the pump thread, then wake it up
if hasattr(cls, 'pump_thread_stop_flag'):
cls.pump_thread_stop_flag.set()
if hasattr(cls, 'pump_thread_wakeup_pipe'):
- os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
+ os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
if hasattr(cls, 'pump_thread'):
cls.logger.debug("Waiting for pump thread to stop")
cls.pump_thread.join()
if hasattr(cls, 'vpp'):
if hasattr(cls, 'vapi'):
+ cls.logger.debug("Disconnecting class vapi client on %s",
+ cls.__name__)
cls.vapi.disconnect()
+ cls.logger.debug("Deleting class vapi attribute on %s",
+ cls.__name__)
del cls.vapi
cls.vpp.poll()
if cls.vpp.returncode is None:
cls.vpp.kill()
cls.logger.debug("Waiting for vpp to die")
cls.vpp.communicate()
+ cls.logger.debug("Deleting class vpp attribute on %s",
+ cls.__name__)
del cls.vpp
if cls.vpp_startup_failed:
(self.__class__.__name__, self._testMethodName,
self._testMethodDoc))
if not self.vpp_dead:
- self.logger.debug(self.vapi.cli("show trace"))
+ self.logger.debug(self.vapi.cli("show trace max 1000"))
self.logger.info(self.vapi.ppcli("show interface"))
self.logger.info(self.vapi.ppcli("show hardware"))
self.logger.info(self.statistics.set_errors_str())
def setUp(self):
""" Clear trace before running each test"""
+ super(VppTestCase, self).setUp()
self.reporter.send_keep_alive(self)
self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
(self.__class__.__name__, self._testMethodName,
cls.logger.debug("Removing zombie capture %s" % cap_name)
cls.vapi.cli('packet-generator delete %s' % cap_name)
- cls.vapi.cli("trace add pg-input 50") # 50 is maximum
+ cls.vapi.cli("trace add pg-input 1000")
cls.vapi.cli('packet-generator enable')
cls._zombie_captures = cls._captures
cls._captures = []
info.ip, info.proto)
@staticmethod
- def payload_to_info(payload):
+ def payload_to_info(payload, payload_field='load'):
"""
Convert packet payload to _PacketInfo object
:param payload: packet payload
-
+ :type: <class 'scapy.packet.Raw'>
+ :param: payload_field: packet fieldname of payload "load" for
+ <class 'scapy.packet.Raw'>
:returns: _PacketInfo object containing de-serialized data from payload
"""
- numbers = payload.split()
+ numbers = getattr(payload, payload_field).split()
info = _PacketInfo()
info.index = int(numbers[0])
info.src = int(numbers[1])
def assert_packet_checksums_valid(self, packet,
ignore_zero_udp_checksums=True):
- received = packet.__class__(str(packet))
+ received = packet.__class__(scapy.compat.raw(packet))
self.logger.debug(
ppp("Verifying packet checksums for packet:", received))
udp_layers = ['UDP', 'UDPerror']
checksum_fields = ['cksum', 'chksum']
checksums = []
counter = 0
- temp = received.__class__(str(received))
+ temp = received.__class__(scapy.compat.raw(received))
while True:
layer = temp.getlayer(counter)
if layer:
for cf in checksum_fields:
if hasattr(layer, cf):
if ignore_zero_udp_checksums and \
- 0 == getattr(layer, cf) and \
- layer.name in udp_layers:
+ 0 == getattr(layer, cf) and \
+ layer.name in udp_layers:
continue
delattr(layer, cf)
checksums.append((counter, cf))
counter = counter + 1
if 0 == len(checksums):
return
- temp = temp.__class__(str(temp))
+ temp = temp.__class__(scapy.compat.raw(temp))
for layer, cf in checksums:
calc_sum = getattr(temp[layer], cf)
self.assert_equal(
received_packet_checksum = getattr(received_packet[layer], field_name)
if ignore_zero_checksum and 0 == received_packet_checksum:
return
- recalculated = received_packet.__class__(str(received_packet))
+ recalculated = received_packet.__class__(
+ scapy.compat.raw(received_packet))
delattr(recalculated[layer], field_name)
- recalculated = recalculated.__class__(str(recalculated))
+ recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
self.assert_equal(received_packet_checksum,
getattr(recalculated[layer], field_name),
"packet checksum on layer: %s" % layer)
self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
def assert_packet_counter_equal(self, counter, expected_value):
- counters = self.vapi.cli("sh errors").split('\n')
- counter_value = -1
- for i in range(1, len(counters)-1):
- results = counters[i].split()
- if results[1] == counter:
- counter_value = int(results[0])
- break
- self.assert_equal(counter_value, expected_value,
- "packet counter `%s'" % counter)
+ if counter.startswith("/"):
+ counter_value = self.statistics.get_counter(counter)
+ self.assert_equal(counter_value, expected_value,
+ "packet counter `%s'" % counter)
+ else:
+ counters = self.vapi.cli("sh errors").split('\n')
+ counter_value = -1
+ for i in range(1, len(counters) - 1):
+ results = counters[i].split()
+ if results[1] == counter:
+ counter_value = int(results[0])
+ break
@classmethod
def sleep(cls, timeout, remark=None):
+
+ # /* Allow sleep(0) to maintain win32 semantics, and as decreed
+ # * by Guido, only the main thread can be interrupted.
+ # */
+ # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
+ if timeout == 0:
+ # yield quantum
+ if hasattr(os, 'sched_yield'):
+ os.sched_yield()
+ else:
+ time.sleep(0)
+ return
+
if hasattr(cls, 'logger'):
- cls.logger.debug("Starting sleep for %ss (%s)" % (timeout, remark))
+ cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
before = time.time()
time.sleep(timeout)
after = time.time()
- if after - before > 2 * timeout:
- cls.logger.error("unexpected time.sleep() result - "
- "slept for %ss instead of ~%ss!" % (
- after - before, timeout))
+ if hasattr(cls, 'logger') and after - before > 2 * timeout:
+ cls.logger.error("unexpected self.sleep() result - "
+ "slept for %es instead of ~%es!",
+ after - before, timeout)
if hasattr(cls, 'logger'):
cls.logger.debug(
- "Finished sleep (%s) - slept %ss (wanted %ss)" % (
- remark, after - before, timeout))
+ "Finished sleep (%s) - slept %es (wanted %es)",
+ remark, after - before, timeout)
- def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
+ def pg_send(self, intf, pkts):
self.vapi.cli("clear trace")
intf.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
+
+ def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
+ self.pg_send(intf, pkts)
if not timeout:
timeout = 1
for i in self.pg_interfaces:
i.assert_nothing_captured(remark=remark)
timeout = 0.1
- def send_and_expect(self, input, pkts, output):
- self.vapi.cli("clear trace")
- input.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- if isinstance(object, (list,)):
- rx = []
- for o in output:
- rx.append(output.get_capture(len(pkts)))
- else:
- rx = output.get_capture(len(pkts))
+ def send_and_expect(self, intf, pkts, output):
+ self.pg_send(intf, pkts)
+ rx = output.get_capture(len(pkts))
return rx
- def send_and_expect_only(self, input, pkts, output, timeout=None):
- self.vapi.cli("clear trace")
- input.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- if isinstance(object, (list,)):
- outputs = output
- rx = []
- for o in outputs:
- rx.append(output.get_capture(len(pkts)))
- else:
- rx = output.get_capture(len(pkts))
- outputs = [output]
+ def send_and_expect_only(self, intf, pkts, output, timeout=None):
+ self.pg_send(intf, pkts)
+ rx = output.get_capture(len(pkts))
+ outputs = [output]
if not timeout:
timeout = 1
for i in self.pg_interfaces:
return rx
+ def runTest(self):
+ """ unittest calls runTest when TestCase is instantiated without a
+ test case. Use case: Writing unittests against VppTestCase"""
+ pass
+
def get_testcase_doc_name(test):
return getdoc(test.__class__).splitlines()[0]
core_crash_test_cases_info = set()
current_test_case_info = None
- def __init__(self, stream, descriptions, verbosity, runner):
+ def __init__(self, stream=None, descriptions=None, verbosity=None,
+ runner=None):
"""
:param stream File descriptor to store where to report test results.
Set to the standard error stream by default.
test case descriptions.
:param verbosity Integer variable to store required verbosity level.
"""
- unittest.TestResult.__init__(self, stream, descriptions, verbosity)
+ super(VppTestResult, self).__init__(stream, descriptions, verbosity)
self.stream = stream
self.descriptions = descriptions
self.verbosity = verbosity
def symlink_failed(self):
if self.current_test_case_info:
try:
- failed_dir = os.getenv('VPP_TEST_FAILED_DIR')
+ failed_dir = os.getenv('FAILED_DIR')
link_path = os.path.join(
failed_dir,
'%s-FAILED' %
if isinstance(test, unittest.suite._ErrorHolder):
test_name = str(test)
else:
- test_name = "'{}' ({})".format(
+ test_name = "'{!s}' ({!s})".format(
get_testcase_doc_name(test), test.id())
self.current_test_case_info.core_crash_test = test_name
self.core_crash_test_cases_info.add(
:param test:
"""
- test.print_header()
+
+ def print_header(test):
+ if not hasattr(test.__class__, '_header_printed'):
+ print(double_line_delim)
+ print(colorize(getdoc(test).splitlines()[0], GREEN))
+ print(double_line_delim)
+ test.__class__._header_printed = True
+
+ print_header(test)
unittest.TestResult.startTest(self, test)
if self.verbosity > 0:
"""
A basic test runner implementation which prints results to standard error.
"""
+
@property
def resultclass(self):
"""Class maintaining the results of the tests"""
def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
result_pipe=None, failfast=False, buffer=False,
- resultclass=None, print_summary=True):
-
+ resultclass=None, print_summary=True, **kwargs):
# ignore stream setting here, use hard-coded stdout to be in sync
# with prints from VppTestCase methods ...
super(VppTestRunner, self).__init__(sys.stdout, descriptions,
verbosity, failfast, buffer,
- resultclass)
+ resultclass, **kwargs)
KeepAliveReporter.pipe = keep_alive_pipe
self.orig_stream = self.stream
self.logger.info(err)
self.logger.info(single_line_delim)
self.result = self.process.returncode
+
+if __name__ == '__main__':
+ pass