from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
from scapy.layers.inet6 import ICMPv6EchoReply
+from vpp_running import use_running
logger = logging.getLogger(__name__)
def pump_output(testclass):
"""pump output from vpp stdout/stderr to proper queues"""
+ if not hasattr(testclass, "vpp"):
+ return
stdout_fragment = ""
stderr_fragment = ""
while not testclass.pump_thread_stop_flag.is_set():
is_distro_ubuntu2204 = _is_distro_ubuntu2204()
+def _is_distro_debian11():
+ with open("/etc/os-release") as f:
+ for line in f.readlines():
+ if "bullseye" in line:
+ return True
+ return False
+
+
+is_distro_debian11 = _is_distro_debian11()
+
+
class KeepAliveReporter(object):
"""
Singleton object which reports test start to parent process
"""
Write current test tmpdir & desc to keep-alive pipe to signal liveness
"""
- if self.pipe is None:
+ if not hasattr(test, "vpp") or self.pipe is None:
# if not running forked..
return
FIXME_ASAN = 3
# marks suites broken on Ubuntu-22.04
FIXME_UBUNTU2204 = 4
+ # marks suites broken on Debian-11
+ FIXME_DEBIAN11 = 5
+ # marks suites broken on debug vpp image
+ FIXME_VPP_DEBUG = 6
def create_tag_decorator(e):
tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN)
tag_fixme_ubuntu2204 = create_tag_decorator(TestCaseTag.FIXME_UBUNTU2204)
+tag_fixme_debian11 = create_tag_decorator(TestCaseTag.FIXME_DEBIAN11)
+tag_fixme_vpp_debug = create_tag_decorator(TestCaseTag.FIXME_VPP_DEBUG)
class DummyVpp:
cls.cpus = cpus
+@use_running
class VppTestCase(CPUInterface, unittest.TestCase):
"""This subclass is a base class for VPP test cases that are implemented as
classes. It provides methods to create and run test case.
"""
extra_vpp_statseg_config = ""
- extra_vpp_punt_config = []
+ extra_vpp_config = []
extra_vpp_plugin_config = []
logger = null_logger
vapi_response_timeout = 5
if cls.has_tag(TestCaseTag.FIXME_UBUNTU2204):
cls = unittest.skip("Skipping @tag_fixme_ubuntu2204 tests")(cls)
+ @classmethod
+ def skip_fixme_debian11(cls):
+ """if distro is Debian-11 and @tag_fixme_debian11 mark for skip"""
+ if cls.has_tag(TestCaseTag.FIXME_DEBIAN11):
+ cls = unittest.skip("Skipping @tag_fixme_debian11 tests")(cls)
+
+ @classmethod
+ def skip_fixme_vpp_debug(cls):
+ cls = unittest.skip("Skipping @tag_fixme_vpp_debug tests")(cls)
+
@classmethod
def instance(cls):
"""Return the instance of this testcase"""
]
)
- if cls.extra_vpp_punt_config is not None:
- cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
+ if cls.extra_vpp_config is not None:
+ cls.vpp_cmdline.extend(cls.extra_vpp_config)
if not cls.debug_attach:
cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
@classmethod
def run_vpp(cls):
+ if (
+ is_distro_ubuntu2204 == True and cls.has_tag(TestCaseTag.FIXME_UBUNTU2204)
+ ) or (is_distro_debian11 == True and cls.has_tag(TestCaseTag.FIXME_DEBIAN11)):
+ return
cls.logger.debug(f"Assigned cpus: {cls.cpus}")
cmdline = cls.vpp_cmdline
cls.attach_vpp()
else:
cls.run_vpp()
+ if not hasattr(cls, "vpp"):
+ return
cls.reporter.send_keep_alive(cls, "setUpClass")
VppTestResult.current_test_case_info = TestCaseInfo(
cls.logger, cls.tempdir, cls.vpp.pid, config.vpp
)
cls.vpp_stdout_deque = deque()
cls.vpp_stderr_deque = deque()
- if not cls.debug_attach:
+ # Pump thread in a non-debug-attached & not running-vpp
+ if not cls.debug_attach and not hasattr(cls, "running_vpp"):
cls.pump_thread_stop_flag = Event()
cls.pump_thread_wakeup_pipe = os.pipe()
cls.pump_thread = Thread(target=pump_output, args=(cls,))
Disconnect vpp-api, kill vpp and cleanup shared memory files
"""
cls._debug_quit()
+ if hasattr(cls, "running_vpp"):
+ cls.vpp.quit_vpp()
# first signal that we want to stop the pump thread, then wake it up
if hasattr(cls, "pump_thread_stop_flag"):
cls.vpp.kill()
outs, errs = cls.vpp.communicate()
cls.logger.debug("Deleting class vpp attribute on %s", cls.__name__)
- if not cls.debug_attach:
+ if not cls.debug_attach and not hasattr(cls, "running_vpp"):
cls.vpp.stdout.close()
cls.vpp.stderr.close()
- del cls.vpp
+ # If vpp is a dynamic attribute set by the func use_running,
+ # deletion will result in an AttributeError that we can
+ # safetly pass.
+ try:
+ del cls.vpp
+ except AttributeError:
+ pass
if cls.vpp_startup_failed:
stdout_log = cls.logger.info
def tearDownClass(cls):
"""Perform final cleanup after running all tests in this test-case"""
cls.logger.debug("--- tearDownClass() for %s called ---" % cls.__name__)
+ if not hasattr(cls, "vpp"):
+ return
cls.reporter.send_keep_alive(cls, "tearDownClass")
cls.quit()
cls.file_handler.close()
"--- tearDown() for %s.%s(%s) called ---"
% (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
)
+ if not hasattr(self, "vpp"):
+ return
try:
if not self.vpp_dead:
vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
self.logger.info("Moving %s to %s\n" % (tmp_api_trace, vpp_api_trace_log))
- os.rename(tmp_api_trace, vpp_api_trace_log)
+ shutil.move(tmp_api_trace, vpp_api_trace_log)
except VppTransportSocketIOError:
self.logger.debug(
"VppTransportSocketIOError: Vpp dead. Cannot log show commands."
def setUp(self):
"""Clear trace before running each test"""
super(VppTestCase, self).setUp()
+ if not hasattr(self, "vpp"):
+ return
self.reporter.send_keep_alive(self)
if self.vpp_dead:
raise VppDiedError(
@classmethod
def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
+ if not hasattr(cls, "vpp"):
+ cls.pg_interfaces = []
+ return cls.pg_interfaces
pgmode = VppEnum.vl_api_pg_interface_mode_t
return cls.create_pg_interfaces_internal(
interfaces, gso, gso_size, pgmode.PG_API_MODE_IP4
@classmethod
def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
+ if not hasattr(cls, "vpp"):
+ cls.pg_interfaces = []
+ return cls.pg_interfaces
pgmode = VppEnum.vl_api_pg_interface_mode_t
return cls.create_pg_interfaces_internal(
interfaces, gso, gso_size, pgmode.PG_API_MODE_IP6
@classmethod
def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
+ if not hasattr(cls, "vpp"):
+ cls.pg_interfaces = []
+ return cls.pg_interfaces
pgmode = VppEnum.vl_api_pg_interface_mode_t
return cls.create_pg_interfaces_internal(
interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
@classmethod
def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
+ if not hasattr(cls, "vpp"):
+ cls.pg_interfaces = []
+ return cls.pg_interfaces
pgmode = VppEnum.vl_api_pg_interface_mode_t
return cls.create_pg_interfaces_internal(
interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
:param count: number of interfaces created.
:returns: List of created interfaces.
"""
+ if not hasattr(cls, "vpp"):
+ cls.lo_interfaces = []
+ return cls.lo_interfaces
result = [VppLoInterface(cls) for i in range(count)]
for intf in result:
setattr(cls, intf.name, intf)
:param count: number of interfaces created.
:returns: List of created interfaces.
"""
+ if not hasattr(cls, "vpp"):
+ cls.bvi_interfaces = []
+ return cls.bvi_interfaces
result = [VppBviInterface(cls) for i in range(count)]
for intf in result:
setattr(cls, intf.name, intf)
if 0 == len(checksums):
return
temp = temp.__class__(scapy.compat.raw(temp))
- for layer, cf in checksums:
+ for layer, cf in reversed(checksums):
calc_sum = getattr(temp[layer], cf)
self.assert_equal(
getattr(received[layer], cf),
)
def assert_checksum_valid(
- self, received_packet, layer, field_name="chksum", ignore_zero_checksum=False
+ self,
+ received_packet,
+ layer,
+ checksum_field_names=["chksum", "cksum"],
+ ignore_zero_checksum=False,
):
"""Check checksum of received packet on given layer"""
+ layer_copy = received_packet[layer].copy()
+ layer_copy.remove_payload()
+ field_name = None
+ for f in checksum_field_names:
+ if hasattr(layer_copy, f):
+ field_name = f
+ break
+ if field_name is None:
+ raise Exception(
+ f"Layer `{layer}` has none of checksum fields: `{checksum_field_names}`."
+ )
received_packet_checksum = getattr(received_packet[layer], field_name)
if ignore_zero_checksum and 0 == received_packet_checksum:
return
self.assert_equal(
received_packet_checksum,
getattr(recalculated[layer], field_name),
- "packet checksum on layer: %s" % layer,
+ f"packet checksum (field: {field_name}) on layer: %s" % layer,
)
def assert_ip_checksum_valid(self, received_packet, ignore_zero_checksum=False):
def assert_icmpv6_checksum_valid(self, pkt):
if pkt.haslayer(ICMPv6DestUnreach):
- self.assert_checksum_valid(pkt, "ICMPv6DestUnreach", "cksum")
+ self.assert_checksum_valid(pkt, "ICMPv6DestUnreach")
self.assert_embedded_icmp_checksum_valid(pkt)
if pkt.haslayer(ICMPv6EchoRequest):
- self.assert_checksum_valid(pkt, "ICMPv6EchoRequest", "cksum")
+ self.assert_checksum_valid(pkt, "ICMPv6EchoRequest")
if pkt.haslayer(ICMPv6EchoReply):
- self.assert_checksum_valid(pkt, "ICMPv6EchoReply", "cksum")
+ self.assert_checksum_valid(pkt, "ICMPv6EchoReply")
def get_counter(self, counter):
if counter.startswith("/"):
test_title = colorize(f"FIXME on Ubuntu-22.04: {test_title}", RED)
test.skip_fixme_ubuntu2204()
+ if is_distro_debian11 == True and test.has_tag(TestCaseTag.FIXME_DEBIAN11):
+ test_title = colorize(f"FIXME on Debian-11: {test_title}", RED)
+ test.skip_fixme_debian11()
+
+ if "debug" in config.vpp_tag and test.has_tag(TestCaseTag.FIXME_VPP_DEBUG):
+ test_title = colorize(f"FIXME on VPP Debug: {test_title}", RED)
+ test.skip_fixme_vpp_debug()
+
if hasattr(test, "vpp_worker_count"):
if test.vpp_worker_count == 0:
test_title += " [main thread only]"