#!/usr/bin/env python3
from __future__ import print_function
-import gc
import logging
import sys
import os
import signal
import subprocess
import unittest
-import tempfile
+import re
import time
import faulthandler
import random
import copy
-import psutil
import platform
+import shutil
from collections import deque
from threading import Thread, Event
from inspect import getdoc, isclass
import scapy.compat
from scapy.packet import Raw, Packet
+from config import config, available_cpus, num_cpus, max_vpp_cpus
import hook as hookmodule
from vpp_pg_interface import VppPGInterface
from vpp_sub_interface import VppSubInterface
from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
from scapy.layers.inet6 import ICMPv6EchoReply
-from cpu_config import available_cpus, num_cpus, max_vpp_cpus
logger = logging.getLogger(__name__)
SKIP_CPU_SHORTAGE = 5
-class BoolEnvironmentVariable(object):
-
- def __init__(self, env_var_name, default='n', true_values=None):
- self.name = env_var_name
- self.default = default
- self.true_values = true_values if true_values is not None else \
- ("y", "yes", "1")
-
- def __bool__(self):
- return os.getenv(self.name, self.default).lower() in self.true_values
-
- if sys.version_info[0] == 2:
- __nonzero__ = __bool__
-
- def __repr__(self):
- return 'BoolEnvironmentVariable(%r, default=%r, true_values=%r)' % \
- (self.name, self.default, self.true_values)
-
-
-debug_framework = BoolEnvironmentVariable('TEST_DEBUG')
-if debug_framework:
+if config.debug_framework:
import debug_internal
"""
limit = -1
stdout_fragment = split[-1]
testclass.vpp_stdout_deque.extend(split[:limit])
- if not testclass.cache_vpp_output:
+ if not config.cache_vpp_output:
for line in split[:limit]:
testclass.logger.info(
"VPP STDOUT: %s" % line.rstrip("\n"))
stderr_fragment = split[-1]
testclass.vpp_stderr_deque.extend(split[:limit])
- if not testclass.cache_vpp_output:
+ if not config.cache_vpp_output:
for line in split[:limit]:
testclass.logger.error(
"VPP STDERR: %s" % line.rstrip("\n"))
# flag will take care of properly terminating the loop
-def _is_skip_aarch64_set():
- return BoolEnvironmentVariable('SKIP_AARCH64')
-
-
-is_skip_aarch64_set = _is_skip_aarch64_set()
-
-
def _is_platform_aarch64():
return platform.machine() == 'aarch64'
is_platform_aarch64 = _is_platform_aarch64()
-def _running_extended_tests():
- return BoolEnvironmentVariable("EXTENDED_TESTS")
-
-
-running_extended_tests = _running_extended_tests()
-
-
-def _running_gcov_tests():
- return BoolEnvironmentVariable("GCOV_TESTS")
-
-
-running_gcov_tests = _running_gcov_tests()
-
-
-def get_environ_vpp_worker_count():
- worker_config = os.getenv("VPP_WORKER_CONFIG", None)
- if worker_config:
- elems = worker_config.split(" ")
- if elems[0] != "workers" or len(elems) != 2:
- raise ValueError("Wrong VPP_WORKER_CONFIG == '%s' value." %
- worker_config)
- return int(elems[1])
- else:
- return 0
-
-
-environ_vpp_worker_count = get_environ_vpp_worker_count()
-
-
class KeepAliveReporter(object):
"""
Singleton object which reports test start to parent process
else:
desc = test.id()
- self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
+ self.pipe.send((desc, config.vpp, test.tempdir, test.vpp.pid))
class TestCaseTag(Enum):
extra_vpp_plugin_config = []
logger = null_logger
vapi_response_timeout = 5
+ remove_configured_vpp_objects_on_tear_down = True
@property
def packet_infos(self):
if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
cls.vpp_worker_count = 0
else:
- cls.vpp_worker_count = environ_vpp_worker_count
+ cls.vpp_worker_count = config.vpp_worker_count
return cls.vpp_worker_count
@classmethod
@classmethod
def setUpConstants(cls):
""" Set-up the test case class based on environment variables """
- cls.step = BoolEnvironmentVariable('STEP')
- # inverted case to handle '' == True
- c = os.getenv("CACHE_OUTPUT", "1")
- cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
- cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
- extern_plugin_path = os.getenv('EXTERN_PLUGINS')
+ cls.step = config.step
+ cls.plugin_path = ":".join(config.vpp_plugin_dir)
+ cls.test_plugin_path = ":".join(config.vpp_test_plugin_dir)
+ cls.extern_plugin_path = ":".join(config.extern_plugin_dir)
debug_cli = ""
if cls.step or cls.debug_gdb or cls.debug_gdbserver:
debug_cli = "cli-listen localhost:5002"
- coredump_size = None
- size = os.getenv("COREDUMP_SIZE")
- if size is not None:
- coredump_size = "coredump-size %s" % size
- if coredump_size is None:
+ size = re.search(r"\d+[gG]", config.coredump_size)
+ if size:
+ coredump_size = f"coredump-size {config.coredump_size}".lower()
+ else:
coredump_size = "coredump-size unlimited"
-
- default_variant = os.getenv("VARIANT")
+ default_variant = config.variant
if default_variant is not None:
default_variant = "defaults { %s 100 }" % default_variant
else:
default_variant = ""
- api_fuzzing = os.getenv("API_FUZZ")
+ api_fuzzing = config.api_fuzz
if api_fuzzing is None:
api_fuzzing = 'off'
cls.vpp_cmdline = [
- cls.vpp_bin,
+ config.vpp,
"unix", "{", "nodaemon", debug_cli, "full-coredump",
coredump_size, "runtime-dir", cls.tempdir, "}",
"api-trace", "{", "on", "}",
"api-segment", "{", "prefix", cls.get_api_segment_prefix(), "}",
"cpu", "{", "main-core", str(cls.cpus[0]), ]
- if extern_plugin_path is not None:
+ if cls.extern_plugin_path not in (None, ""):
cls.extra_vpp_plugin_config.append(
- "add-path %s" % extern_plugin_path)
+ "add-path %s" % cls.extern_plugin_path)
if cls.get_vpp_worker_count():
cls.vpp_cmdline.extend([
"corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])])
print(single_line_delim)
print("You can debug VPP using:")
if cls.debug_gdbserver:
- print("sudo gdb " + cls.vpp_bin +
- " -ex 'target remote localhost:{port}'"
- .format(port=cls.gdbserver_port))
+ print(f"sudo gdb {config.vpp} "
+ f"-ex 'target remote localhost:{cls.gdbserver_port}'")
print("Now is the time to attach gdb by running the above "
"command, set up breakpoints etc., then resume VPP from "
"within gdb by issuing the 'continue' command")
cls.gdbserver_port += 1
elif cls.debug_gdb:
- print("sudo gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
+ print(f"sudo gdb {config.vpp} -ex 'attach {cls.vpp.pid}'")
print("Now is the time to attach gdb by running the above "
"command and set up breakpoints etc., then resume VPP from"
" within gdb by issuing the 'continue' command")
@classmethod
def get_tempdir(cls):
if cls.debug_attach:
- return os.getenv("VPP_IN_GDB_TMP_DIR",
- "/tmp/unittest-attach-gdb")
+ tmpdir = f"{config.tmp_dir}/unittest-attach-gdb"
else:
- return tempfile.mkdtemp(prefix='vpp-unittest-%s-' % cls.__name__)
+ tmpdir = f"{config.tmp_dir}/vpp-unittest-{cls.__name__}"
+ if config.wipe_tmp_dir:
+ shutil.rmtree(tmpdir, ignore_errors=True)
+ os.mkdir(tmpdir)
+ return tmpdir
+
+ @classmethod
+ def create_file_handler(cls):
+ if config.log_dir is None:
+ cls.file_handler = FileHandler(f"{cls.tempdir}/log.txt")
+ return
+
+ logdir = f"{config.log_dir}/vpp-unittest-{cls.__name__}"
+ if config.wipe_tmp_dir:
+ shutil.rmtree(logdir, ignore_errors=True)
+ os.mkdir(logdir)
+ cls.file_handler = FileHandler(f"{logdir}/log.txt")
@classmethod
def setUpClass(cls):
"""
super(VppTestCase, cls).setUpClass()
cls.logger = get_logger(cls.__name__)
- seed = os.environ["RND_SEED"]
- random.seed(seed)
+ random.seed(config.rnd_seed)
if hasattr(cls, 'parallel_handler'):
cls.logger.addHandler(cls.parallel_handler)
cls.logger.propagate = False
- d = os.getenv("DEBUG", None)
- cls.set_debug_flags(d)
+ cls.set_debug_flags(config.debug)
cls.tempdir = cls.get_tempdir()
- cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
+ cls.create_file_handler()
cls.file_handler.setFormatter(
Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
datefmt="%H:%M:%S"))
os.chdir(cls.tempdir)
cls.logger.info("Temporary dir is %s, api socket is %s",
cls.tempdir, cls.get_api_sock_path())
- cls.logger.debug("Random seed is %s", seed)
+ cls.logger.debug("Random seed is %s", config.rnd_seed)
cls.setUpConstants()
cls.reset_packet_infos()
cls._pcaps = []
cls.run_vpp()
cls.reporter.send_keep_alive(cls, 'setUpClass')
VppTestResult.current_test_case_info = TestCaseInfo(
- cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
+ cls.logger, cls.tempdir, cls.vpp.pid, config.vpp)
cls.vpp_stdout_deque = deque()
cls.vpp_stderr_deque = deque()
if not cls.debug_attach:
cls.quit()
cls.file_handler.close()
cls.reset_packet_infos()
- if debug_framework:
+ if config.debug_framework:
debug_internal.on_tear_down_class(cls)
def show_commands_at_teardown(self):
self.logger.info(self.vapi.ppcli("show bihash"))
self.logger.info("Logging testcase specific show commands.")
self.show_commands_at_teardown()
- self.registry.remove_vpp_config(self.logger)
+ if self.remove_configured_vpp_objects_on_tear_down:
+ self.registry.remove_vpp_config(self.logger)
# Save/Dump VPP api trace log
m = self._testMethodName
api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
def pg_start(cls, trace=True):
""" Enable the PG, wait till it is done, then clean up """
for (intf, worker) in cls._old_pcaps:
- intf.rename_old_pcap_file(intf.get_in_path(worker),
+ intf.handle_old_pcap_file(intf.get_in_path(worker),
intf.in_history_counter)
cls._old_pcaps = []
if trace:
if pkt.haslayer(ICMPv6EchoReply):
self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
- def get_packet_counter(self, counter):
+ def get_counter(self, counter):
if counter.startswith("/"):
counter_value = self.statistics.get_counter(counter)
else:
break
return counter_value
+ def assert_counter_equal(self, counter, expected_value,
+ thread=None, index=0):
+ c = self.get_counter(counter)
+ if thread is not None:
+ c = c[thread][index]
+ else:
+ c = sum(x[index] for x in c)
+ self.assert_equal(c, expected_value, "counter `%s'" % counter)
+
def assert_packet_counter_equal(self, counter, expected_value):
- counter_value = self.get_packet_counter(counter)
+ counter_value = self.get_counter(counter)
self.assert_equal(counter_value, expected_value,
"packet counter `%s'" % counter)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start(trace=trace)
+ def snapshot_stats(self, stats_diff):
+ """Return snapshot of interesting stats based on diff dictionary."""
+ stats_snapshot = {}
+ for sw_if_index in stats_diff:
+ for counter in stats_diff[sw_if_index]:
+ stats_snapshot[counter] = self.statistics[counter]
+ self.logger.debug(f"Took statistics stats_snapshot: {stats_snapshot}")
+ return stats_snapshot
+
+ def compare_stats_with_snapshot(self, stats_diff, stats_snapshot):
+ """Assert appropriate difference between current stats and snapshot."""
+ for sw_if_index in stats_diff:
+ for cntr, diff in stats_diff[sw_if_index].items():
+ if sw_if_index == "err":
+ self.assert_equal(
+ self.statistics[cntr].sum(),
+ stats_snapshot[cntr].sum() + diff,
+ f"'{cntr}' counter value (previous value: "
+ f"{stats_snapshot[cntr].sum()}, "
+ f"expected diff: {diff})")
+ else:
+ try:
+ self.assert_equal(
+ self.statistics[cntr][:, sw_if_index].sum(),
+ stats_snapshot[cntr][:, sw_if_index].sum() + diff,
+ f"'{cntr}' counter value (previous value: "
+ f"{stats_snapshot[cntr][:, sw_if_index].sum()}, "
+ f"expected diff: {diff})")
+ except IndexError:
+ # if diff is 0, then this most probably a case where
+ # test declares multiple interfaces but traffic hasn't
+ # passed through this one yet - which means the counter
+ # value is 0 and can be ignored
+ if 0 != diff:
+ raise
+
def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None,
- trace=True):
+ stats_diff=None, trace=True, msg=None):
+ if stats_diff:
+ stats_snapshot = self.snapshot_stats(stats_diff)
+
self.pg_send(intf, pkts)
- if not timeout:
- timeout = 1
- for i in self.pg_interfaces:
- i.get_capture(0, timeout=timeout)
- i.assert_nothing_captured(remark=remark)
- timeout = 0.1
- if trace:
- self.logger.debug(self.vapi.cli("show trace"))
+
+ try:
+ if not timeout:
+ timeout = 1
+ for i in self.pg_interfaces:
+ i.assert_nothing_captured(timeout=timeout, remark=remark)
+ timeout = 0.1
+ finally:
+ if trace:
+ if msg:
+ self.logger.debug(f"send_and_assert_no_replies: {msg}")
+ self.logger.debug(self.vapi.cli("show trace"))
+
+ if stats_diff:
+ self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
def send_and_expect(self, intf, pkts, output, n_rx=None, worker=None,
- trace=True):
+ trace=True, msg=None, stats_diff=None):
+ if stats_diff:
+ stats_snapshot = self.snapshot_stats(stats_diff)
+
if not n_rx:
n_rx = 1 if isinstance(pkts, Packet) else len(pkts)
self.pg_send(intf, pkts, worker=worker, trace=trace)
rx = output.get_capture(n_rx)
+ if trace:
+ if msg:
+ self.logger.debug(f"send_and_expect: {msg}")
+ self.logger.debug(self.vapi.cli("show trace"))
+
+ if stats_diff:
+ self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
+
+ return rx
+
+ def send_and_expect_load_balancing(self, input, pkts, outputs,
+ worker=None, trace=True):
+ self.pg_send(input, pkts, worker=worker, trace=trace)
+ rxs = []
+ for oo in outputs:
+ rx = oo._get_capture(1)
+ self.assertNotEqual(0, len(rx))
+ rxs.append(rx)
if trace:
self.logger.debug(self.vapi.cli("show trace"))
+ return rxs
+
+ def send_and_expect_some(self, intf, pkts, output,
+ worker=None,
+ trace=True):
+ self.pg_send(intf, pkts, worker=worker, trace=trace)
+ rx = output._get_capture(1)
+ if trace:
+ self.logger.debug(self.vapi.cli("show trace"))
+ self.assertTrue(len(rx) > 0)
+ self.assertTrue(len(rx) < len(pkts))
return rx
- def send_and_expect_only(self, intf, pkts, output, timeout=None):
+ def send_and_expect_only(self, intf, pkts, output, timeout=None,
+ stats_diff=None):
+ if stats_diff:
+ stats_snapshot = self.snapshot_stats(stats_diff)
+
self.pg_send(intf, pkts)
rx = output.get_capture(len(pkts))
outputs = [output]
timeout = 1
for i in self.pg_interfaces:
if i not in outputs:
- i.get_capture(0, timeout=timeout)
- i.assert_nothing_captured()
+ i.assert_nothing_captured(timeout=timeout)
timeout = 0.1
+ if stats_diff:
+ self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
+
return rx
def symlink_failed(self):
if self.current_test_case_info:
try:
- failed_dir = os.getenv('FAILED_DIR')
+ failed_dir = config.failed_dir
link_path = os.path.join(
failed_dir,
'%s-FAILED' %