from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
from scapy.layers.inet6 import ICMPv6EchoReply
+from vpp_running import use_running
+from test_result_code import TestResultCode
logger = logging.getLogger(__name__)
null_logger = logging.getLogger("VppTestCase")
null_logger.addHandler(logging.NullHandler())
-PASS = 0
-FAIL = 1
-ERROR = 2
-SKIP = 3
-TEST_RUN = 4
-SKIP_CPU_SHORTAGE = 5
-
if config.debug_framework:
import debug_internal
def pump_output(testclass):
"""pump output from vpp stdout/stderr to proper queues"""
+ if not hasattr(testclass, "vpp"):
+ return
stdout_fragment = ""
stderr_fragment = ""
while not testclass.pump_thread_stop_flag.is_set():
is_platform_aarch64 = _is_platform_aarch64()
+def _is_distro_ubuntu2204():
+ with open("/etc/os-release") as f:
+ for line in f.readlines():
+ if "jammy" in line:
+ return True
+ return False
+
+
+is_distro_ubuntu2204 = _is_distro_ubuntu2204()
+
+
+def _is_distro_debian11():
+ with open("/etc/os-release") as f:
+ for line in f.readlines():
+ if "bullseye" in line:
+ return True
+ return False
+
+
+is_distro_debian11 = _is_distro_debian11()
+
+
class KeepAliveReporter(object):
"""
Singleton object which reports test start to parent process
"""
Write current test tmpdir & desc to keep-alive pipe to signal liveness
"""
- if self.pipe is None:
+ if not hasattr(test, "vpp") or self.pipe is None:
# if not running forked..
return
FIXME_VPP_WORKERS = 2
# marks the suites broken when ASan is enabled
FIXME_ASAN = 3
+ # marks suites broken on Ubuntu-22.04
+ FIXME_UBUNTU2204 = 4
+ # marks suites broken on Debian-11
+ FIXME_DEBIAN11 = 5
+ # marks suites broken on debug vpp image
+ FIXME_VPP_DEBUG = 6
def create_tag_decorator(e):
tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN)
+tag_fixme_ubuntu2204 = create_tag_decorator(TestCaseTag.FIXME_UBUNTU2204)
+tag_fixme_debian11 = create_tag_decorator(TestCaseTag.FIXME_DEBIAN11)
+tag_fixme_vpp_debug = create_tag_decorator(TestCaseTag.FIXME_VPP_DEBUG)
class DummyVpp:
cls.cpus = cpus
+@use_running
class VppTestCase(CPUInterface, unittest.TestCase):
"""This subclass is a base class for VPP test cases that are implemented as
classes. It provides methods to create and run test case.
"""
extra_vpp_statseg_config = ""
- extra_vpp_punt_config = []
+ extra_vpp_config = []
extra_vpp_plugin_config = []
logger = null_logger
vapi_response_timeout = 5
if "DVPP_ENABLE_SANITIZE_ADDR=ON" in vpp_extra_cmake_args:
cls = unittest.skip("Skipping @tag_fixme_asan tests")(cls)
+ @classmethod
+ def skip_fixme_ubuntu2204(cls):
+ """if distro is ubuntu 22.04 and @tag_fixme_ubuntu2204 mark for skip"""
+ if cls.has_tag(TestCaseTag.FIXME_UBUNTU2204):
+ cls = unittest.skip("Skipping @tag_fixme_ubuntu2204 tests")(cls)
+
+ @classmethod
+ def skip_fixme_debian11(cls):
+ """if distro is Debian-11 and @tag_fixme_debian11 mark for skip"""
+ if cls.has_tag(TestCaseTag.FIXME_DEBIAN11):
+ cls = unittest.skip("Skipping @tag_fixme_debian11 tests")(cls)
+
+ @classmethod
+ def skip_fixme_vpp_debug(cls):
+ cls = unittest.skip("Skipping @tag_fixme_vpp_debug tests")(cls)
+
@classmethod
def instance(cls):
"""Return the instance of this testcase"""
coredump_size = "coredump-size unlimited"
default_variant = config.variant
if default_variant is not None:
- default_variant = "defaults { %s 100 }" % default_variant
+ default_variant = "default { variant %s 100 }" % default_variant
else:
default_variant = ""
]
)
- if cls.extra_vpp_punt_config is not None:
- cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
+ if cls.extra_vpp_config is not None:
+ cls.vpp_cmdline.extend(cls.extra_vpp_config)
if not cls.debug_attach:
cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
@classmethod
def run_vpp(cls):
+ if (
+ is_distro_ubuntu2204 == True and cls.has_tag(TestCaseTag.FIXME_UBUNTU2204)
+ ) or (is_distro_debian11 == True and cls.has_tag(TestCaseTag.FIXME_DEBIAN11)):
+ return
cls.logger.debug(f"Assigned cpus: {cls.cpus}")
cmdline = cls.vpp_cmdline
cls.attach_vpp()
else:
cls.run_vpp()
+ if not hasattr(cls, "vpp"):
+ return
cls.reporter.send_keep_alive(cls, "setUpClass")
VppTestResult.current_test_case_info = TestCaseInfo(
cls.logger, cls.tempdir, cls.vpp.pid, config.vpp
)
cls.vpp_stdout_deque = deque()
cls.vpp_stderr_deque = deque()
- if not cls.debug_attach:
+ # Pump thread in a non-debug-attached & not running-vpp
+ if not cls.debug_attach and not hasattr(cls, "running_vpp"):
cls.pump_thread_stop_flag = Event()
cls.pump_thread_wakeup_pipe = os.pipe()
cls.pump_thread = Thread(target=pump_output, args=(cls,))
Disconnect vpp-api, kill vpp and cleanup shared memory files
"""
cls._debug_quit()
+ if hasattr(cls, "running_vpp"):
+ cls.vpp.quit_vpp()
# first signal that we want to stop the pump thread, then wake it up
if hasattr(cls, "pump_thread_stop_flag"):
cls.vpp.kill()
outs, errs = cls.vpp.communicate()
cls.logger.debug("Deleting class vpp attribute on %s", cls.__name__)
- if not cls.debug_attach:
+ if not cls.debug_attach and not hasattr(cls, "running_vpp"):
cls.vpp.stdout.close()
cls.vpp.stderr.close()
- del cls.vpp
+ # If vpp is a dynamic attribute set by the func use_running,
+ # deletion will result in an AttributeError that we can
+ # safetly pass.
+ try:
+ del cls.vpp
+ except AttributeError:
+ pass
if cls.vpp_startup_failed:
stdout_log = cls.logger.info
def tearDownClass(cls):
"""Perform final cleanup after running all tests in this test-case"""
cls.logger.debug("--- tearDownClass() for %s called ---" % cls.__name__)
+ if not hasattr(cls, "vpp"):
+ return
cls.reporter.send_keep_alive(cls, "tearDownClass")
cls.quit()
cls.file_handler.close()
"--- tearDown() for %s.%s(%s) called ---"
% (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
)
+ if not hasattr(self, "vpp"):
+ return
try:
if not self.vpp_dead:
vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
self.logger.info("Moving %s to %s\n" % (tmp_api_trace, vpp_api_trace_log))
- os.rename(tmp_api_trace, vpp_api_trace_log)
+ shutil.move(tmp_api_trace, vpp_api_trace_log)
except VppTransportSocketIOError:
self.logger.debug(
"VppTransportSocketIOError: Vpp dead. Cannot log show commands."
def setUp(self):
"""Clear trace before running each test"""
super(VppTestCase, self).setUp()
+ if not hasattr(self, "vpp"):
+ return
self.reporter.send_keep_alive(self)
if self.vpp_dead:
raise VppDiedError(
@classmethod
def pg_start(cls, trace=True):
"""Enable the PG, wait till it is done, then clean up"""
- for (intf, worker) in cls._old_pcaps:
+ for intf, worker in cls._old_pcaps:
intf.handle_old_pcap_file(intf.get_in_path(worker), intf.in_history_counter)
cls._old_pcaps = []
if trace:
@classmethod
def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
+ if not hasattr(cls, "vpp"):
+ cls.pg_interfaces = []
+ return cls.pg_interfaces
pgmode = VppEnum.vl_api_pg_interface_mode_t
return cls.create_pg_interfaces_internal(
interfaces, gso, gso_size, pgmode.PG_API_MODE_IP4
@classmethod
def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
+ if not hasattr(cls, "vpp"):
+ cls.pg_interfaces = []
+ return cls.pg_interfaces
pgmode = VppEnum.vl_api_pg_interface_mode_t
return cls.create_pg_interfaces_internal(
interfaces, gso, gso_size, pgmode.PG_API_MODE_IP6
@classmethod
def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
+ if not hasattr(cls, "vpp"):
+ cls.pg_interfaces = []
+ return cls.pg_interfaces
pgmode = VppEnum.vl_api_pg_interface_mode_t
return cls.create_pg_interfaces_internal(
interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
@classmethod
def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
+ if not hasattr(cls, "vpp"):
+ cls.pg_interfaces = []
+ return cls.pg_interfaces
pgmode = VppEnum.vl_api_pg_interface_mode_t
return cls.create_pg_interfaces_internal(
interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
:param count: number of interfaces created.
:returns: List of created interfaces.
"""
+ if not hasattr(cls, "vpp"):
+ cls.lo_interfaces = []
+ return cls.lo_interfaces
result = [VppLoInterface(cls) for i in range(count)]
for intf in result:
setattr(cls, intf.name, intf)
:param count: number of interfaces created.
:returns: List of created interfaces.
"""
+ if not hasattr(cls, "vpp"):
+ cls.bvi_interfaces = []
+ return cls.bvi_interfaces
result = [VppBviInterface(cls) for i in range(count)]
for intf in result:
setattr(cls, intf.name, intf)
if 0 == len(checksums):
return
temp = temp.__class__(scapy.compat.raw(temp))
- for layer, cf in checksums:
+ for layer, cf in reversed(checksums):
calc_sum = getattr(temp[layer], cf)
self.assert_equal(
getattr(received[layer], cf),
)
def assert_checksum_valid(
- self, received_packet, layer, field_name="chksum", ignore_zero_checksum=False
+ self,
+ received_packet,
+ layer,
+ checksum_field_names=["chksum", "cksum"],
+ ignore_zero_checksum=False,
):
"""Check checksum of received packet on given layer"""
+ layer_copy = received_packet[layer].copy()
+ layer_copy.remove_payload()
+ field_name = None
+ for f in checksum_field_names:
+ if hasattr(layer_copy, f):
+ field_name = f
+ break
+ if field_name is None:
+ raise Exception(
+ f"Layer `{layer}` has none of checksum fields: `{checksum_field_names}`."
+ )
received_packet_checksum = getattr(received_packet[layer], field_name)
if ignore_zero_checksum and 0 == received_packet_checksum:
return
self.assert_equal(
received_packet_checksum,
getattr(recalculated[layer], field_name),
- "packet checksum on layer: %s" % layer,
+ f"packet checksum (field: {field_name}) on layer: %s" % layer,
)
def assert_ip_checksum_valid(self, received_packet, ignore_zero_checksum=False):
def assert_icmpv6_checksum_valid(self, pkt):
if pkt.haslayer(ICMPv6DestUnreach):
- self.assert_checksum_valid(pkt, "ICMPv6DestUnreach", "cksum")
+ self.assert_checksum_valid(pkt, "ICMPv6DestUnreach")
self.assert_embedded_icmp_checksum_valid(pkt)
if pkt.haslayer(ICMPv6EchoRequest):
- self.assert_checksum_valid(pkt, "ICMPv6EchoRequest", "cksum")
+ self.assert_checksum_valid(pkt, "ICMPv6EchoRequest")
if pkt.haslayer(ICMPv6EchoReply):
- self.assert_checksum_valid(pkt, "ICMPv6EchoReply", "cksum")
+ self.assert_checksum_valid(pkt, "ICMPv6EchoReply")
def get_counter(self, counter):
if counter.startswith("/"):
@classmethod
def sleep(cls, timeout, remark=None):
-
# /* Allow sleep(0) to maintain win32 semantics, and as decreed
# * by Guido, only the main thread can be interrupted.
# */
f"{stats_snapshot[cntr][:, sw_if_index].sum()}, "
f"expected diff: {diff})",
)
- except IndexError:
+ except IndexError as e:
# if diff is 0, then this most probably a case where
# test declares multiple interfaces but traffic hasn't
# passed through this one yet - which means the counter
# value is 0 and can be ignored
if 0 != diff:
- raise
+ raise Exception(
+ f"Couldn't sum counter: {cntr} on sw_if_index: {sw_if_index}"
+ ) from e
def send_and_assert_no_replies(
self, intf, pkts, remark="", timeout=None, stats_diff=None, trace=True, msg=None
self.stream = stream
self.descriptions = descriptions
self.verbosity = verbosity
+ self.result_code = TestResultCode.TEST_RUN
self.result_string = None
self.runner = runner
self.printed = []
:param test:
"""
- if self.current_test_case_info:
- self.current_test_case_info.logger.debug(
- "--- addSuccess() %s.%s(%s) called"
- % (test.__class__.__name__, test._testMethodName, test._testMethodDoc)
- )
+ self.log_result("addSuccess", test)
unittest.TestResult.addSuccess(self, test)
self.result_string = colorize("OK", GREEN)
-
- self.send_result_through_pipe(test, PASS)
+ self.result_code = TestResultCode.PASS
+ self.send_result_through_pipe(test, self.result_code)
+
+ def addExpectedFailure(self, test, err):
+ self.log_result("addExpectedFailure", test, err)
+ super().addExpectedFailure(test, err)
+ self.result_string = colorize("FAIL", GREEN)
+ self.result_code = TestResultCode.EXPECTED_FAIL
+ self.send_result_through_pipe(test, self.result_code)
+
+ def addUnexpectedSuccess(self, test):
+ self.log_result("addUnexpectedSuccess", test)
+ super().addUnexpectedSuccess(test)
+ self.result_string = colorize("OK", RED)
+ self.result_code = TestResultCode.UNEXPECTED_PASS
+ self.send_result_through_pipe(test, self.result_code)
def addSkip(self, test, reason):
"""
:param reason:
"""
- if self.current_test_case_info:
- self.current_test_case_info.logger.debug(
- "--- addSkip() %s.%s(%s) called, reason is %s"
- % (
- test.__class__.__name__,
- test._testMethodName,
- test._testMethodDoc,
- reason,
- )
- )
+ self.log_result("addSkip", test, reason=reason)
unittest.TestResult.addSkip(self, test, reason)
self.result_string = colorize("SKIP", YELLOW)
if reason == "not enough cpus":
- self.send_result_through_pipe(test, SKIP_CPU_SHORTAGE)
+ self.result_code = TestResultCode.SKIP_CPU_SHORTAGE
else:
- self.send_result_through_pipe(test, SKIP)
+ self.result_code = TestResultCode.SKIP
+ self.send_result_through_pipe(test, self.result_code)
def symlink_failed(self):
if self.current_test_case_info:
if pipe:
pipe.send((test.id(), result))
- def log_error(self, test, err, fn_name):
+ def log_result(self, fn, test, err=None, reason=None):
if self.current_test_case_info:
if isinstance(test, unittest.suite._ErrorHolder):
test_name = test.description
test._testMethodName,
test._testMethodDoc,
)
+ extra_msg = ""
+ if err:
+ extra_msg += f", error is {err}"
+ if reason:
+ extra_msg += f", reason is {reason}"
self.current_test_case_info.logger.debug(
- "--- %s() %s called, err is %s" % (fn_name, test_name, err)
- )
- self.current_test_case_info.logger.debug(
- "formatted exception is:\n%s" % "".join(format_exception(*err))
+ f"--- {fn}() {test_name} called{extra_msg}"
)
+ if err:
+ self.current_test_case_info.logger.debug(
+ "formatted exception is:\n%s" % "".join(format_exception(*err))
+ )
- def add_error(self, test, err, unittest_fn, error_type):
- if error_type == FAIL:
- self.log_error(test, err, "addFailure")
+ def add_error(self, test, err, unittest_fn, result_code):
+ self.result_code = result_code
+ if result_code == TestResultCode.FAIL:
+ self.log_result("addFailure", test, err=err)
error_type_str = colorize("FAIL", RED)
- elif error_type == ERROR:
- self.log_error(test, err, "addError")
+ elif result_code == TestResultCode.ERROR:
+ self.log_result("addError", test, err=err)
error_type_str = colorize("ERROR", RED)
else:
- raise Exception(
- "Error type %s cannot be used to record an "
- "error or a failure" % error_type
- )
+ raise Exception(f"Unexpected result code {result_code}")
unittest_fn(self, test, err)
if self.current_test_case_info:
else:
self.result_string = "%s [no temp dir]" % error_type_str
- self.send_result_through_pipe(test, error_type)
+ self.send_result_through_pipe(test, result_code)
def addFailure(self, test, err):
"""
:param err: error message
"""
- self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
+ self.add_error(test, err, unittest.TestResult.addFailure, TestResultCode.FAIL)
def addError(self, test, err):
"""
:param err: error message
"""
- self.add_error(test, err, unittest.TestResult.addError, ERROR)
+ self.add_error(test, err, unittest.TestResult.addError, TestResultCode.ERROR)
def getDescription(self, test):
"""
test_title = colorize(f"FIXME with ASAN: {test_title}", RED)
test.skip_fixme_asan()
+ if is_distro_ubuntu2204 == True and test.has_tag(
+ TestCaseTag.FIXME_UBUNTU2204
+ ):
+ test_title = colorize(f"FIXME on Ubuntu-22.04: {test_title}", RED)
+ test.skip_fixme_ubuntu2204()
+
+ if is_distro_debian11 == True and test.has_tag(TestCaseTag.FIXME_DEBIAN11):
+ test_title = colorize(f"FIXME on Debian-11: {test_title}", RED)
+ test.skip_fixme_debian11()
+
+ if "debug" in config.vpp_tag and test.has_tag(TestCaseTag.FIXME_VPP_DEBUG):
+ test_title = colorize(f"FIXME on VPP Debug: {test_title}", RED)
+ test.skip_fixme_vpp_debug()
+
if hasattr(test, "vpp_worker_count"):
if test.vpp_worker_count == 0:
test_title += " [main thread only]"
"""
unittest.TestResult.stopTest(self, test)
+ result_code_to_suffix = {
+ TestResultCode.PASS: "",
+ TestResultCode.FAIL: "",
+ TestResultCode.ERROR: "",
+ TestResultCode.SKIP: "",
+ TestResultCode.TEST_RUN: "",
+ TestResultCode.SKIP_CPU_SHORTAGE: "",
+ TestResultCode.EXPECTED_FAIL: " [EXPECTED FAIL]",
+ TestResultCode.UNEXPECTED_PASS: " [UNEXPECTED PASS]",
+ }
+
if self.verbosity > 0:
self.stream.writeln(single_line_delim)
self.stream.writeln(
- "%-73s%s" % (self.getDescription(test), self.result_string)
+ "%-72s%s%s"
+ % (
+ self.getDescription(test),
+ self.result_string,
+ result_code_to_suffix[self.result_code],
+ )
)
self.stream.writeln(single_line_delim)
else:
self.stream.writeln(
- "%-68s %4.2f %s"
+ "%-67s %4.2f %s%s"
% (
self.getDescription(test),
time.time() - self.start_test,
self.result_string,
+ result_code_to_suffix[self.result_code],
)
)
- self.send_result_through_pipe(test, TEST_RUN)
+ self.send_result_through_pipe(test, TestResultCode.TEST_RUN)
def printErrors(self):
"""