from logging import FileHandler, DEBUG, Formatter
from scapy.packet import Raw
from hook import StepHook, PollHook, VppDiedError
-from vpp_config import VppTestCaseVppConfig
-from vpp_interface import VppInterface
-from vpp_sub_interface import VppSubInterface
from vpp_pg_interface import VppPGInterface
+from vpp_sub_interface import VppSubInterface
from vpp_lo_interface import VppLoInterface
from vpp_papi_provider import VppPapiProvider
from vpp_papi.vpp_stats import VPPStats
classes. It provides methods to create and run test case.
"""
- CLI_LISTEN_DEFAULT = 'localhost:5002'
- config = VppTestCaseVppConfig()
+ extra_vpp_punt_config = []
+ extra_vpp_plugin_config = []
@property
def packet_infos(self):
return random.choice(tuple(min_usage_set))
- @classmethod
- def print_header(cls):
- if not hasattr(cls, '_header_printed'):
- print(double_line_delim)
- print(colorize(getdoc(cls).splitlines()[0], GREEN))
- print(double_line_delim)
- cls._header_printed = True
-
@classmethod
def setUpConstants(cls):
""" Set-up the test case class based on environment variables """
plugin_path = cls.plugin_path
elif cls.extern_plugin_path is not None:
plugin_path = cls.extern_plugin_path
-
+ debug_cli = ""
if cls.step or cls.debug_gdb or cls.debug_gdbserver:
- cls.config.add('unix', 'cli-listen', cls.CLI_LISTEN_DEFAULT)
-
+ debug_cli = "cli-listen localhost:5002"
coredump_size = None
size = os.getenv("COREDUMP_SIZE")
- cls.config.add('unix', 'coredump-size',
- size if size is not None else 'unlimited')
-
- cls.config.add('unix', 'runtime-dir', cls.tempdir)
- cls.config.add('api-segment', 'prefix', cls.shm_prefix)
- cls.config.add('cpu', 'main-core', str(cls.get_least_used_cpu()))
- cls.config.add('statseg', 'socket-name', cls.stats_sock)
-
+ if size is not None:
+ coredump_size = "coredump-size %s" % size
+ if coredump_size is None:
+ coredump_size = "coredump-size unlimited"
+
+ cpu_core_number = cls.get_least_used_cpu()
+
+ cls.vpp_cmdline = [cls.vpp_bin, "unix",
+ "{", "nodaemon", debug_cli, "full-coredump",
+ coredump_size, "runtime-dir", cls.tempdir, "}",
+ "api-trace", "{", "on", "}", "api-segment", "{",
+ "prefix", cls.shm_prefix, "}", "cpu", "{",
+ "main-core", str(cpu_core_number), "}", "statseg",
+ "{", "socket-name", cls.stats_sock, "}", "plugins",
+ "{", "plugin", "dpdk_plugin.so", "{", "disable",
+ "}", "plugin", "unittest_plugin.so", "{", "enable",
+ "}"] + cls.extra_vpp_plugin_config + ["}", ]
+ if cls.extra_vpp_punt_config is not None:
+ cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
if plugin_path is not None:
- cls.config.add('plugins', 'path', plugin_path)
- cls.config.add_plugin('dpdk_plugin.so', 'disable')
- cls.config.add_plugin('unittest_plugin.so', 'enable')
-
- cls.vpp_cmdline = [cls.vpp_bin] + cls.config.shlex()
+ cls.vpp_cmdline.extend(["plugin_path", plugin_path])
cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
stderr=subprocess.PIPE,
bufsize=1)
except subprocess.CalledProcessError as e:
- cls.logger.critical("Couldn't start vpp: %s" % e)
+ cls.logger.critical("Subprocess returned with non-0 return code: ("
+ "%s)", e.returncode)
+ raise
+ except OSError as e:
+ cls.logger.critical("Subprocess returned with OS error: "
+ "(%s) %s", e.errno, e.strerror)
+ raise
+ except Exception as e:
+ cls.logger.exception("Subprocess returned unexpected from "
+ "%s:", cmdline)
raise
cls.wait_for_enter()
Perform class setup before running the testcase
Remove shared memory files, start vpp and connect the vpp-api
"""
+ super(VppTestCase, cls).setUpClass()
gc.collect() # run garbage collection first
random.seed()
- cls.print_header()
cls.logger = get_logger(cls.__name__)
if hasattr(cls, 'parallel_handler'):
cls.logger.addHandler(cls.parallel_handler)
stderr_log(single_line_delim)
stderr_log('VPP output to stderr while running %s:', cls.__name__)
stderr_log(single_line_delim)
- vpp_output = "".join(str(cls.vpp_stderr_deque))
+ vpp_output = "".join(cls.vpp_stderr_deque)
with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
f.write(vpp_output)
stderr_log('\n%s', vpp_output)
def tearDown(self):
""" Show various debug prints after each test """
+ super(VppTestCase, self).tearDown()
self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
(self.__class__.__name__, self._testMethodName,
self._testMethodDoc))
if not self.vpp_dead:
- self.logger.debug(self.vapi.cli("show trace"))
+ self.logger.debug(self.vapi.cli("show trace max 1000"))
self.logger.info(self.vapi.ppcli("show interface"))
self.logger.info(self.vapi.ppcli("show hardware"))
self.logger.info(self.statistics.set_errors_str())
def setUp(self):
""" Clear trace before running each test"""
+ super(VppTestCase, self).setUp()
self.reporter.send_keep_alive(self)
self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
(self.__class__.__name__, self._testMethodName,
cls.logger.debug("Removing zombie capture %s" % cap_name)
cls.vapi.cli('packet-generator delete %s' % cap_name)
- cls.vapi.cli("trace add pg-input 50") # 50 is maximum
+ cls.vapi.cli("trace add pg-input 1000")
cls.vapi.cli('packet-generator enable')
cls._zombie_captures = cls._captures
cls._captures = []
for cf in checksum_fields:
if hasattr(layer, cf):
if ignore_zero_udp_checksums and \
- 0 == getattr(layer, cf) and \
- layer.name in udp_layers:
+ 0 == getattr(layer, cf) and \
+ layer.name in udp_layers:
continue
delattr(layer, cf)
checksums.append((counter, cf))
"Finished sleep (%s) - slept %es (wanted %es)",
remark, after - before, timeout)
- def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
+ def pg_send(self, intf, pkts):
self.vapi.cli("clear trace")
intf.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
+
+ def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
+ self.pg_send(intf, pkts)
if not timeout:
timeout = 1
for i in self.pg_interfaces:
i.assert_nothing_captured(remark=remark)
timeout = 0.1
- def send_and_expect(self, input, pkts, output):
- self.vapi.cli("clear trace")
- input.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- if isinstance(object, (list,)):
- rx = []
- for o in output:
- rx.append(output.get_capture(len(pkts)))
- else:
- rx = output.get_capture(len(pkts))
+ def send_and_expect(self, intf, pkts, output):
+ self.pg_send(intf, pkts)
+ rx = output.get_capture(len(pkts))
return rx
- def send_and_expect_only(self, input, pkts, output, timeout=None):
- self.vapi.cli("clear trace")
- input.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- if isinstance(object, (list,)):
- outputs = output
- rx = []
- for o in outputs:
- rx.append(output.get_capture(len(pkts)))
- else:
- rx = output.get_capture(len(pkts))
- outputs = [output]
+ def send_and_expect_only(self, intf, pkts, output, timeout=None):
+ self.pg_send(intf, pkts)
+ rx = output.get_capture(len(pkts))
+ outputs = [output]
if not timeout:
timeout = 1
for i in self.pg_interfaces:
core_crash_test_cases_info = set()
current_test_case_info = None
- def __init__(self, stream, descriptions, verbosity, runner):
+ def __init__(self, stream=None, descriptions=None, verbosity=None,
+ runner=None):
"""
:param stream File descriptor to store where to report test results.
Set to the standard error stream by default.
test case descriptions.
:param verbosity Integer variable to store required verbosity level.
"""
- unittest.TestResult.__init__(self, stream, descriptions, verbosity)
+ super(VppTestResult, self).__init__(stream, descriptions, verbosity)
self.stream = stream
self.descriptions = descriptions
self.verbosity = verbosity
:param test:
"""
- test.print_header()
+
+ def print_header(test):
+ if not hasattr(test.__class__, '_header_printed'):
+ print(double_line_delim)
+ print(colorize(getdoc(test).splitlines()[0], GREEN))
+ print(double_line_delim)
+ test.__class__._header_printed = True
+
+ print_header(test)
unittest.TestResult.startTest(self, test)
if self.verbosity > 0:
def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
result_pipe=None, failfast=False, buffer=False,
- resultclass=None, print_summary=True):
+ resultclass=None, print_summary=True, **kwargs):
# ignore stream setting here, use hard-coded stdout to be in sync
# with prints from VppTestCase methods ...
super(VppTestRunner, self).__init__(sys.stdout, descriptions,
verbosity, failfast, buffer,
- resultclass)
+ resultclass, **kwargs)
KeepAliveReporter.pipe = keep_alive_pipe
self.orig_stream = self.stream
self.logger.info(err)
self.logger.info(single_line_delim)
self.result = self.process.returncode
+
+if __name__ == '__main__':
+ pass