import copy
import platform
import shutil
+from pathlib import Path
from collections import deque
from threading import Thread, Event
from inspect import getdoc, isclass
from logging import FileHandler, DEBUG, Formatter
from enum import Enum
from abc import ABC, abstractmethod
-from struct import pack, unpack
-from config import config, available_cpus, num_cpus, max_vpp_cpus
+from config import config, max_vpp_cpus
import hook as hookmodule
-from vpp_pg_interface import VppPGInterface
-from vpp_sub_interface import VppSubInterface
from vpp_lo_interface import VppLoInterface
-from vpp_bvi_interface import VppBviInterface
from vpp_papi_provider import VppPapiProvider
-from vpp_papi import VppEnum
import vpp_papi
from vpp_papi.vpp_stats import VPPStats
from vpp_papi.vpp_transport_socket import VppTransportSocketIOError
colorize,
)
from vpp_object import VppObjectRegistry
-from util import ppp, is_core_present
+from util import is_core_present
from test_result_code import TestResultCode
logger = logging.getLogger(__name__)
# Set up an empty logger for the testcase that can be overridden as necessary
-null_logger = logging.getLogger("VppTestCase")
+null_logger = logging.getLogger("VppAsfTestCase")
null_logger.addHandler(logging.NullHandler())
super(VppDiedError, self).__init__(msg)
-class _PacketInfo(object):
- """Private class to create packet info object.
-
- Help process information about the next packet.
- Set variables to default values.
- """
-
- #: Store the index of the packet.
- index = -1
- #: Store the index of the source packet generator interface of the packet.
- src = -1
- #: Store the index of the destination packet generator interface
- #: of the packet.
- dst = -1
- #: Store expected ip version
- ip = -1
- #: Store expected upper protocol
- proto = -1
- #: Store the copy of the former packet.
- data = None
-
- def __eq__(self, other):
- index = self.index == other.index
- src = self.src == other.src
- dst = self.dst == other.dst
- data = self.data == other.data
- return index and src and dst and data
-
-
def pump_output(testclass):
"""pump output from vpp stdout/stderr to proper queues"""
stdout_fragment = ""
is_platform_aarch64 = _is_platform_aarch64()
+def _is_distro_ubuntu2204():
+ with open("/etc/os-release") as f:
+ for line in f.readlines():
+ if "jammy" in line:
+ return True
+ return False
+
+
+is_distro_ubuntu2204 = _is_distro_ubuntu2204()
+
+
+def _is_distro_debian11():
+ with open("/etc/os-release") as f:
+ for line in f.readlines():
+ if "bullseye" in line:
+ return True
+ return False
+
+
+is_distro_debian11 = _is_distro_debian11()
+
+
+def _is_distro_ubuntu2204():
+ with open("/etc/os-release") as f:
+ for line in f.readlines():
+ if "jammy" in line:
+ return True
+ return False
+
+
class KeepAliveReporter(object):
"""
Singleton object which reports test start to parent process
FIXME_VPP_WORKERS = 2
# marks the suites broken when ASan is enabled
FIXME_ASAN = 3
+ # marks suites broken on Ubuntu-22.04
+ FIXME_UBUNTU2204 = 4
+ # marks suites broken on Debian-11
+ FIXME_DEBIAN11 = 5
+ # marks suites broken on debug vpp image
+ FIXME_VPP_DEBUG = 6
def create_tag_decorator(e):
tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN)
+tag_fixme_ubuntu2204 = create_tag_decorator(TestCaseTag.FIXME_UBUNTU2204)
+tag_fixme_debian11 = create_tag_decorator(TestCaseTag.FIXME_DEBIAN11)
+tag_fixme_vpp_debug = create_tag_decorator(TestCaseTag.FIXME_VPP_DEBUG)
class DummyVpp:
cls.cpus = cpus
-class VppTestCase(CPUInterface, unittest.TestCase):
+class VppAsfTestCase(CPUInterface, unittest.TestCase):
"""This subclass is a base class for VPP test cases that are implemented as
classes. It provides methods to create and run test case.
"""
vapi_response_timeout = 5
remove_configured_vpp_objects_on_tear_down = True
- @property
- def packet_infos(self):
- """List of packet infos"""
- return self._packet_infos
-
- @classmethod
- def get_packet_count_for_if_idx(cls, dst_if_index):
- """Get the number of packet info for specified destination if index"""
- if dst_if_index in cls._packet_count_for_dst_if_idx:
- return cls._packet_count_for_dst_if_idx[dst_if_index]
- else:
- return 0
-
@classmethod
def has_tag(cls, tag):
"""if the test case has a given tag - return true"""
if cls.debug_attach:
tmpdir = f"{config.tmp_dir}/unittest-attach-gdb"
else:
- tmpdir = f"{config.tmp_dir}/vpp-unittest-{cls.__name__}"
+ tmpdir = f"{config.tmp_dir}/{get_testcase_dirname(cls.__name__)}"
if config.wipe_tmp_dir:
shutil.rmtree(tmpdir, ignore_errors=True)
os.mkdir(tmpdir)
cls.file_handler = FileHandler(f"{cls.tempdir}/log.txt")
return
- logdir = f"{config.log_dir}/vpp-unittest-{cls.__name__}"
+ logdir = f"{config.log_dir}/{get_testcase_dirname(cls.__name__)}"
if config.wipe_tmp_dir:
shutil.rmtree(logdir, ignore_errors=True)
os.mkdir(logdir)
Perform class setup before running the testcase
Remove shared memory files, start vpp and connect the vpp-api
"""
- super(VppTestCase, cls).setUpClass()
+ super(VppAsfTestCase, cls).setUpClass()
cls.logger = get_logger(cls.__name__)
+ cls.logger.debug(f"--- START setUpClass() {cls.__name__} ---")
random.seed(config.rnd_seed)
if hasattr(cls, "parallel_handler"):
cls.logger.addHandler(cls.parallel_handler)
)
cls.logger.debug("Random seed is %s", config.rnd_seed)
cls.setUpConstants()
- cls.reset_packet_infos()
- cls._pcaps = []
- cls._old_pcaps = []
cls.verbose = 0
cls.vpp_dead = False
cls.registry = VppObjectRegistry()
try:
hook.poll_vpp()
except VppDiedError:
+ cls.wait_for_coredump()
cls.vpp_startup_failed = True
cls.logger.critical(
"VPP died shortly after startup, check the"
cls.logger.debug("Exception connecting to VPP: %s" % e)
cls.quit()
raise e
+ cls.logger.debug(f"--- END setUpClass() {cls.__name__} ---")
@classmethod
def _debug_quit(cls):
@classmethod
def tearDownClass(cls):
"""Perform final cleanup after running all tests in this test-case"""
- cls.logger.debug("--- tearDownClass() for %s called ---" % cls.__name__)
+ cls.logger.debug(f"--- START tearDownClass() {cls.__name__} ---")
cls.reporter.send_keep_alive(cls, "tearDownClass")
cls.quit()
cls.file_handler.close()
- cls.reset_packet_infos()
if config.debug_framework:
debug_internal.on_tear_down_class(cls)
+ cls.logger.debug(f"--- END tearDownClass() {cls.__name__} ---")
def show_commands_at_teardown(self):
"""Allow subclass specific teardown logging additions."""
self.logger.info("--- No test specific show commands provided. ---")
+ def unlink_testcase_file(self, path):
+ MAX_ATTEMPTS = 9
+ retries = MAX_ATTEMPTS
+ while retries > 0:
+ retries = retries - 1
+ self.logger.debug(f"Unlinking {path}")
+ try:
+ path.unlink()
+ # Loop until unlink() fails with FileNotFoundError to ensure file is removed
+ except FileNotFoundError:
+ break
+ except OSError:
+ self.logger.debug(f"OSError: unlinking {path}")
+ self.sleep(0.25, f"{retries} retries left")
+ if retries == 0 and os.path.isfile(path):
+ self.logger.error(
+ f"Unable to delete testcase file in {MAX_ATTEMPTS} attempts: {path}"
+ )
+ raise OSError
+
def tearDown(self):
"""Show various debug prints after each test"""
self.logger.debug(
- "--- tearDown() for %s.%s(%s) called ---"
- % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
+ f"--- START tearDown() {self.__class__.__name__}.{self._testMethodName}({self._testMethodDoc}) ---"
)
try:
self.vpp_dead = True
else:
self.registry.unregister_all(self.logger)
+ # Remove any leftover pcap files
+ if hasattr(self, "pg_interfaces") and len(self.pg_interfaces) > 0:
+ testcase_dir = os.path.dirname(self.pg_interfaces[0].out_path)
+ for p in Path(testcase_dir).glob("pg*.pcap"):
+ self.unlink_testcase_file(p)
+ self.logger.debug(
+ f"--- END tearDown() {self.__class__.__name__}.{self._testMethodName}('{self._testMethodDoc}') ---"
+ )
def setUp(self):
"""Clear trace before running each test"""
- super(VppTestCase, self).setUp()
+ super(VppAsfTestCase, self).setUp()
+ self.logger.debug(
+ f"--- START setUp() {self.__class__.__name__}.{self._testMethodName}('{self._testMethodDoc}') ---"
+ )
+ # Save testname include in pcap history filenames
+ if hasattr(self, "pg_interfaces"):
+ for i in self.pg_interfaces:
+ i.test_name = self._testMethodName
self.reporter.send_keep_alive(self)
if self.vpp_dead:
+ self.wait_for_coredump()
raise VppDiedError(
rv=None,
testcase=self.__class__.__name__,
# store the test instance inside the test class - so that objects
# holding the class can access instance methods (like assertEqual)
type(self).test_instance = self
-
- @classmethod
- def pg_enable_capture(cls, interfaces=None):
- """
- Enable capture on packet-generator interfaces
-
- :param interfaces: iterable interface indexes (if None,
- use self.pg_interfaces)
-
- """
- if interfaces is None:
- interfaces = cls.pg_interfaces
- for i in interfaces:
- i.enable_capture()
-
- @classmethod
- def register_pcap(cls, intf, worker):
- """Register a pcap in the testclass"""
- # add to the list of captures with current timestamp
- cls._pcaps.append((intf, worker))
+ self.logger.debug(
+ f"--- END setUp() {self.__class__.__name__}.{self._testMethodName}('{self._testMethodDoc}') ---"
+ )
@classmethod
def get_vpp_time(cls):
while cls.get_vpp_time() - start_time < sec:
cls.sleep(0.1)
- @classmethod
- def pg_start(cls, trace=True):
- """Enable the PG, wait till it is done, then clean up"""
- for (intf, worker) in cls._old_pcaps:
- intf.handle_old_pcap_file(intf.get_in_path(worker), intf.in_history_counter)
- cls._old_pcaps = []
- if trace:
- cls.vapi.cli("clear trace")
- cls.vapi.cli("trace add pg-input 1000")
- cls.vapi.cli("packet-generator enable")
- # PG, when starts, runs to completion -
- # so let's avoid a race condition,
- # and wait a little till it's done.
- # Then clean it up - and then be gone.
- deadline = time.time() + 300
- while cls.vapi.cli("show packet-generator").find("Yes") != -1:
- cls.sleep(0.01) # yield
- if time.time() > deadline:
- cls.logger.error("Timeout waiting for pg to stop")
- break
- for intf, worker in cls._pcaps:
- cls.vapi.cli("packet-generator delete %s" % intf.get_cap_name(worker))
- cls._old_pcaps = cls._pcaps
- cls._pcaps = []
-
- @classmethod
- def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0, mode=None):
- """
- Create packet-generator interfaces.
-
- :param interfaces: iterable indexes of the interfaces.
- :returns: List of created interfaces.
-
- """
- result = []
- for i in interfaces:
- intf = VppPGInterface(cls, i, gso, gso_size, mode)
- setattr(cls, intf.name, intf)
- result.append(intf)
- cls.pg_interfaces = result
- return result
-
- @classmethod
- def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
- pgmode = VppEnum.vl_api_pg_interface_mode_t
- return cls.create_pg_interfaces_internal(
- interfaces, gso, gso_size, pgmode.PG_API_MODE_IP4
- )
-
- @classmethod
- def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
- pgmode = VppEnum.vl_api_pg_interface_mode_t
- return cls.create_pg_interfaces_internal(
- interfaces, gso, gso_size, pgmode.PG_API_MODE_IP6
- )
-
- @classmethod
- def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
- pgmode = VppEnum.vl_api_pg_interface_mode_t
- return cls.create_pg_interfaces_internal(
- interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
- )
-
- @classmethod
- def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
- pgmode = VppEnum.vl_api_pg_interface_mode_t
- return cls.create_pg_interfaces_internal(
- interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
- )
-
@classmethod
def create_loopback_interfaces(cls, count):
"""
cls.lo_interfaces = result
return result
- @classmethod
- def create_bvi_interfaces(cls, count):
- """
- Create BVI interfaces.
-
- :param count: number of interfaces created.
- :returns: List of created interfaces.
- """
- result = [VppBviInterface(cls) for i in range(count)]
- for intf in result:
- setattr(cls, intf.name, intf)
- cls.bvi_interfaces = result
- return result
-
- @classmethod
- def reset_packet_infos(cls):
- """Reset the list of packet info objects and packet counts to zero"""
- cls._packet_infos = {}
- cls._packet_count_for_dst_if_idx = {}
-
- @classmethod
- def create_packet_info(cls, src_if, dst_if):
- """
- Create packet info object containing the source and destination indexes
- and add it to the testcase's packet info list
-
- :param VppInterface src_if: source interface
- :param VppInterface dst_if: destination interface
-
- :returns: _PacketInfo object
-
- """
- info = _PacketInfo()
- info.index = len(cls._packet_infos)
- info.src = src_if.sw_if_index
- info.dst = dst_if.sw_if_index
- if isinstance(dst_if, VppSubInterface):
- dst_idx = dst_if.parent.sw_if_index
- else:
- dst_idx = dst_if.sw_if_index
- if dst_idx in cls._packet_count_for_dst_if_idx:
- cls._packet_count_for_dst_if_idx[dst_idx] += 1
- else:
- cls._packet_count_for_dst_if_idx[dst_idx] = 1
- cls._packet_infos[info.index] = info
- return info
-
- @staticmethod
- def info_to_payload(info):
- """
- Convert _PacketInfo object to packet payload
-
- :param info: _PacketInfo object
-
- :returns: string containing serialized data from packet info
- """
-
- # retrieve payload, currently 18 bytes (4 x ints + 1 short)
- return pack("iiiih", info.index, info.src, info.dst, info.ip, info.proto)
-
- def get_next_packet_info(self, info):
- """
- Iterate over the packet info list stored in the testcase
- Start iteration with first element if info is None
- Continue based on index in info if info is specified
-
- :param info: info or None
- :returns: next info in list or None if no more infos
- """
- if info is None:
- next_index = 0
- else:
- next_index = info.index + 1
- if next_index == len(self._packet_infos):
- return None
- else:
- return self._packet_infos[next_index]
-
- def get_next_packet_info_for_interface(self, src_index, info):
- """
- Search the packet info list for the next packet info with same source
- interface index
-
- :param src_index: source interface index to search for
- :param info: packet info - where to start the search
- :returns: packet info or None
-
- """
- while True:
- info = self.get_next_packet_info(info)
- if info is None:
- return None
- if info.src == src_index:
- return info
-
- def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
- """
- Search the packet info list for the next packet info with same source
- and destination interface indexes
-
- :param src_index: source interface index to search for
- :param dst_index: destination interface index to search for
- :param info: packet info - where to start the search
- :returns: packet info or None
-
- """
- while True:
- info = self.get_next_packet_info_for_interface(src_index, info)
- if info is None:
- return None
- if info.dst == dst_index:
- return info
-
def assert_equal(self, real_value, expected_value, name_or_class=None):
if name_or_class is None:
self.assertEqual(real_value, expected_value)
)
self.assertTrue(expected_min <= real_value <= expected_max, msg)
- def assert_ip_checksum_valid(self, received_packet, ignore_zero_checksum=False):
- self.assert_checksum_valid(
- received_packet, "IP", ignore_zero_checksum=ignore_zero_checksum
- )
-
- def assert_tcp_checksum_valid(self, received_packet, ignore_zero_checksum=False):
- self.assert_checksum_valid(
- received_packet, "TCP", ignore_zero_checksum=ignore_zero_checksum
- )
-
- def assert_udp_checksum_valid(self, received_packet, ignore_zero_checksum=True):
- self.assert_checksum_valid(
- received_packet, "UDP", ignore_zero_checksum=ignore_zero_checksum
- )
-
- def assert_icmp_checksum_valid(self, received_packet):
- self.assert_checksum_valid(received_packet, "ICMP")
- self.assert_embedded_icmp_checksum_valid(received_packet)
-
def get_counter(self, counter):
if counter.startswith("/"):
counter_value = self.statistics.get_counter(counter)
c = c[thread][index]
else:
c = sum(x[index] for x in c)
- self.assert_equal(c, expected_value, "counter `%s'" % counter)
-
- def assert_packet_counter_equal(self, counter, expected_value):
- counter_value = self.get_counter(counter)
- self.assert_equal(
- counter_value, expected_value, "packet counter `%s'" % counter
+ self.logger.debug(
+ "validate counter `%s[%s]', expected: %s, real value: %s"
+ % (counter, index, expected_value, c)
)
+ self.assert_equal(c, expected_value, "counter `%s[%s]'" % (counter, index))
def assert_error_counter_equal(self, counter, expected_value):
counter_value = self.statistics[counter].sum()
@classmethod
def sleep(cls, timeout, remark=None):
-
# /* Allow sleep(0) to maintain win32 semantics, and as decreed
# * by Guido, only the main thread can be interrupted.
# */
self.logger.debug("Moving VPP time by %s (%s)", timeout, remark)
self.vapi.cli("set clock adjust %s" % timeout)
- def pg_send(self, intf, pkts, worker=None, trace=True):
- intf.add_stream(pkts, worker=worker)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start(trace=trace)
-
def snapshot_stats(self, stats_diff):
"""Return snapshot of interesting stats based on diff dictionary."""
stats_snapshot = {}
f"Couldn't sum counter: {cntr} on sw_if_index: {sw_if_index}"
) from e
- def send_and_assert_no_replies(
- self, intf, pkts, remark="", timeout=None, stats_diff=None, trace=True, msg=None
- ):
- if stats_diff:
- stats_snapshot = self.snapshot_stats(stats_diff)
-
- self.pg_send(intf, pkts)
-
- try:
- if not timeout:
- timeout = 1
- for i in self.pg_interfaces:
- i.assert_nothing_captured(timeout=timeout, remark=remark)
- timeout = 0.1
- finally:
- if trace:
- if msg:
- self.logger.debug(f"send_and_assert_no_replies: {msg}")
- self.logger.debug(self.vapi.cli("show trace"))
-
- if stats_diff:
- self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
-
- def send_and_expect_load_balancing(
- self, input, pkts, outputs, worker=None, trace=True
- ):
- self.pg_send(input, pkts, worker=worker, trace=trace)
- rxs = []
- for oo in outputs:
- rx = oo._get_capture(1)
- self.assertNotEqual(0, len(rx))
- rxs.append(rx)
- if trace:
- self.logger.debug(self.vapi.cli("show trace"))
- return rxs
-
- def send_and_expect_some(self, intf, pkts, output, worker=None, trace=True):
- self.pg_send(intf, pkts, worker=worker, trace=trace)
- rx = output._get_capture(1)
- if trace:
- self.logger.debug(self.vapi.cli("show trace"))
- self.assertTrue(len(rx) > 0)
- self.assertTrue(len(rx) < len(pkts))
- return rx
-
- def send_and_expect_only(self, intf, pkts, output, timeout=None, stats_diff=None):
- if stats_diff:
- stats_snapshot = self.snapshot_stats(stats_diff)
-
- self.pg_send(intf, pkts)
- rx = output.get_capture(len(pkts))
- outputs = [output]
- if not timeout:
- timeout = 1
- for i in self.pg_interfaces:
- if i not in outputs:
- i.assert_nothing_captured(timeout=timeout)
- timeout = 0.1
-
- if stats_diff:
- self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
-
- return rx
-
def get_testcase_doc_name(test):
return getdoc(test.__class__).splitlines()[0]
return str(test)
+def get_failed_testcase_linkname(failed_dir, testcase_dirname):
+ return os.path.join(failed_dir, f"{testcase_dirname}-FAILED")
+
+
+def get_testcase_dirname(testcase_class_name):
+ return f"vpp-unittest-{testcase_class_name}"
+
+
class TestCaseInfo(object):
def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
self.logger = logger
self.runner = runner
self.printed = []
+ def decodePcapFiles(self, test, when_configured=False):
+ if when_configured == False or config.decode_pcaps == True:
+ if hasattr(test, "pg_interfaces") and len(test.pg_interfaces) > 0:
+ testcase_dir = os.path.dirname(test.pg_interfaces[0].out_path)
+ test.pg_interfaces[0].decode_pcap_files(
+ testcase_dir, f"suite{test.__class__.__name__}"
+ )
+ test.pg_interfaces[0].decode_pcap_files(
+ testcase_dir, test._testMethodName
+ )
+
def addSuccess(self, test):
"""
Record a test succeeded result
"""
self.log_result("addSuccess", test)
+ self.decodePcapFiles(test, when_configured=True)
unittest.TestResult.addSuccess(self, test)
self.result_string = colorize("OK", GREEN)
self.result_code = TestResultCode.PASS
def addExpectedFailure(self, test, err):
self.log_result("addExpectedFailure", test, err)
+ self.decodePcapFiles(test)
super().addExpectedFailure(test, err)
self.result_string = colorize("FAIL", GREEN)
self.result_code = TestResultCode.EXPECTED_FAIL
def addUnexpectedSuccess(self, test):
self.log_result("addUnexpectedSuccess", test)
+ self.decodePcapFiles(test, when_configured=True)
super().addUnexpectedSuccess(test)
self.result_string = colorize("OK", RED)
self.result_code = TestResultCode.UNEXPECTED_PASS
if self.current_test_case_info:
try:
failed_dir = config.failed_dir
- link_path = os.path.join(
- failed_dir,
- "%s-FAILED" % os.path.basename(self.current_test_case_info.tempdir),
+ link_path = get_failed_testcase_linkname(
+ failed_dir, os.path.basename(self.current_test_case_info.tempdir)
)
self.current_test_case_info.logger.debug(
error_type_str = colorize("ERROR", RED)
else:
raise Exception(f"Unexpected result code {result_code}")
+ self.decodePcapFiles(test)
unittest_fn(self, test, err)
if self.current_test_case_info:
**kwargs,
):
# ignore stream setting here, use hard-coded stdout to be in sync
- # with prints from VppTestCase methods ...
+ # with prints from VppAsfTestCase methods ...
super(VppTestRunner, self).__init__(
sys.stdout, descriptions, verbosity, failfast, buffer, resultclass, **kwargs
)