#!/usr/bin/env python3
from __future__ import print_function
-import gc
import logging
import sys
import os
import signal
import subprocess
import unittest
-import tempfile
+import re
import time
import faulthandler
import random
import copy
-import psutil
import platform
+import shutil
from collections import deque
from threading import Thread, Event
from inspect import getdoc, isclass
from logging import FileHandler, DEBUG, Formatter
from enum import Enum
from abc import ABC, abstractmethod
+from struct import pack, unpack
import scapy.compat
-from scapy.packet import Raw
+from scapy.packet import Raw, Packet
+from config import config, available_cpus, num_cpus, max_vpp_cpus
import hook as hookmodule
from vpp_pg_interface import VppPGInterface
from vpp_sub_interface import VppSubInterface
from vpp_lo_interface import VppLoInterface
from vpp_bvi_interface import VppBviInterface
from vpp_papi_provider import VppPapiProvider
+from vpp_papi import VppEnum
import vpp_papi
from vpp_papi.vpp_stats import VPPStats
from vpp_papi.vpp_transport_socket import VppTransportSocketIOError
from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
from scapy.layers.inet6 import ICMPv6EchoReply
-from cpu_config import available_cpus, num_cpus, max_vpp_cpus
logger = logging.getLogger(__name__)
SKIP_CPU_SHORTAGE = 5
-class BoolEnvironmentVariable(object):
-
- def __init__(self, env_var_name, default='n', true_values=None):
- self.name = env_var_name
- self.default = default
- self.true_values = true_values if true_values is not None else \
- ("y", "yes", "1")
-
- def __bool__(self):
- return os.getenv(self.name, self.default).lower() in self.true_values
-
- if sys.version_info[0] == 2:
- __nonzero__ = __bool__
-
- def __repr__(self):
- return 'BoolEnvironmentVariable(%r, default=%r, true_values=%r)' % \
- (self.name, self.default, self.true_values)
-
-
-debug_framework = BoolEnvironmentVariable('TEST_DEBUG')
-if debug_framework:
+if config.debug_framework:
import debug_internal
"""
limit = -1
stdout_fragment = split[-1]
testclass.vpp_stdout_deque.extend(split[:limit])
- if not testclass.cache_vpp_output:
+ if not config.cache_vpp_output:
for line in split[:limit]:
testclass.logger.info(
"VPP STDOUT: %s" % line.rstrip("\n"))
stderr_fragment = split[-1]
testclass.vpp_stderr_deque.extend(split[:limit])
- if not testclass.cache_vpp_output:
+ if not config.cache_vpp_output:
for line in split[:limit]:
testclass.logger.error(
"VPP STDERR: %s" % line.rstrip("\n"))
# flag will take care of properly terminating the loop
-def _is_skip_aarch64_set():
- return BoolEnvironmentVariable('SKIP_AARCH64')
-
-
-is_skip_aarch64_set = _is_skip_aarch64_set()
-
-
def _is_platform_aarch64():
return platform.machine() == 'aarch64'
is_platform_aarch64 = _is_platform_aarch64()
-def _running_extended_tests():
- return BoolEnvironmentVariable("EXTENDED_TESTS")
-
-
-running_extended_tests = _running_extended_tests()
-
-
-def _running_gcov_tests():
- return BoolEnvironmentVariable("GCOV_TESTS")
-
-
-running_gcov_tests = _running_gcov_tests()
-
-
-def get_environ_vpp_worker_count():
- worker_config = os.getenv("VPP_WORKER_CONFIG", None)
- if worker_config:
- elems = worker_config.split(" ")
- if elems[0] != "workers" or len(elems) != 2:
- raise ValueError("Wrong VPP_WORKER_CONFIG == '%s' value." %
- worker_config)
- return int(elems[1])
- else:
- return 0
-
-
-environ_vpp_worker_count = get_environ_vpp_worker_count()
-
-
class KeepAliveReporter(object):
"""
Singleton object which reports test start to parent process
else:
desc = test.id()
- self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
+ self.pipe.send((desc, config.vpp, test.tempdir, test.vpp.pid))
class TestCaseTag(Enum):
RUN_SOLO = 1
# marks the suites broken on VPP multi-worker
FIXME_VPP_WORKERS = 2
+ # marks the suites broken when ASan is enabled
+ FIXME_ASAN = 3
def create_tag_decorator(e):
tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
+tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN)
class DummyVpp:
extra_vpp_plugin_config = []
logger = null_logger
vapi_response_timeout = 5
+ remove_configured_vpp_objects_on_tear_down = True
@property
def packet_infos(self):
""" if the test case class is timing-sensitive - return true """
return cls.has_tag(TestCaseTag.RUN_SOLO)
+ @classmethod
+ def skip_fixme_asan(cls):
+ """ if @tag_fixme_asan & ASan is enabled - mark for skip """
+ if cls.has_tag(TestCaseTag.FIXME_ASAN):
+ vpp_extra_cmake_args = os.environ.get('VPP_EXTRA_CMAKE_ARGS', '')
+ if 'DVPP_ENABLE_SANITIZE_ADDR=ON' in vpp_extra_cmake_args:
+ cls = unittest.skip("Skipping @tag_fixme_asan tests")(cls)
+
@classmethod
def instance(cls):
"""Return the instance of this testcase"""
if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
cls.vpp_worker_count = 0
else:
- cls.vpp_worker_count = environ_vpp_worker_count
+ cls.vpp_worker_count = config.vpp_worker_count
return cls.vpp_worker_count
@classmethod
@classmethod
def setUpConstants(cls):
""" Set-up the test case class based on environment variables """
- cls.step = BoolEnvironmentVariable('STEP')
- # inverted case to handle '' == True
- c = os.getenv("CACHE_OUTPUT", "1")
- cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
- cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
- cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
- cls.test_plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
- cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
- plugin_path = None
- if cls.plugin_path is not None:
- if cls.extern_plugin_path is not None:
- plugin_path = "%s:%s" % (
- cls.plugin_path, cls.extern_plugin_path)
- else:
- plugin_path = cls.plugin_path
- elif cls.extern_plugin_path is not None:
- plugin_path = cls.extern_plugin_path
+ cls.step = config.step
+ cls.plugin_path = ":".join(config.vpp_plugin_dir)
+ cls.test_plugin_path = ":".join(config.vpp_test_plugin_dir)
+ cls.extern_plugin_path = ":".join(config.extern_plugin_dir)
debug_cli = ""
if cls.step or cls.debug_gdb or cls.debug_gdbserver:
debug_cli = "cli-listen localhost:5002"
- coredump_size = None
- size = os.getenv("COREDUMP_SIZE")
- if size is not None:
- coredump_size = "coredump-size %s" % size
- if coredump_size is None:
+ size = re.search(r"\d+[gG]", config.coredump_size)
+ if size:
+ coredump_size = f"coredump-size {config.coredump_size}".lower()
+ else:
coredump_size = "coredump-size unlimited"
-
- default_variant = os.getenv("VARIANT")
+ default_variant = config.variant
if default_variant is not None:
default_variant = "defaults { %s 100 }" % default_variant
else:
default_variant = ""
- api_fuzzing = os.getenv("API_FUZZ")
+ api_fuzzing = config.api_fuzz
if api_fuzzing is None:
api_fuzzing = 'off'
cls.vpp_cmdline = [
- cls.vpp_bin,
+ config.vpp,
"unix", "{", "nodaemon", debug_cli, "full-coredump",
coredump_size, "runtime-dir", cls.tempdir, "}",
"api-trace", "{", "on", "}",
"api-segment", "{", "prefix", cls.get_api_segment_prefix(), "}",
"cpu", "{", "main-core", str(cls.cpus[0]), ]
+ if cls.extern_plugin_path not in (None, ""):
+ cls.extra_vpp_plugin_config.append(
+ "add-path %s" % cls.extern_plugin_path)
if cls.get_vpp_worker_count():
cls.vpp_cmdline.extend([
"corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])])
if cls.extra_vpp_punt_config is not None:
cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
- if plugin_path is not None:
- cls.vpp_cmdline.extend(["plugin_path", plugin_path])
- if cls.test_plugin_path is not None:
- cls.vpp_cmdline.extend(["test_plugin_path", cls.test_plugin_path])
if not cls.debug_attach:
cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
print(single_line_delim)
print("You can debug VPP using:")
if cls.debug_gdbserver:
- print("sudo gdb " + cls.vpp_bin +
- " -ex 'target remote localhost:{port}'"
- .format(port=cls.gdbserver_port))
+ print(f"sudo gdb {config.vpp} "
+ f"-ex 'target remote localhost:{cls.gdbserver_port}'")
print("Now is the time to attach gdb by running the above "
"command, set up breakpoints etc., then resume VPP from "
"within gdb by issuing the 'continue' command")
cls.gdbserver_port += 1
elif cls.debug_gdb:
- print("sudo gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
+ print(f"sudo gdb {config.vpp} -ex 'attach {cls.vpp.pid}'")
print("Now is the time to attach gdb by running the above "
"command and set up breakpoints etc., then resume VPP from"
" within gdb by issuing the 'continue' command")
@classmethod
def get_tempdir(cls):
- if cls.debug_attach:
- return os.getenv("VPP_IN_GDB_TMP_DIR",
- "/tmp/unittest-attach-gdb")
- else:
- return tempfile.mkdtemp(prefix='vpp-unittest-%s-' % cls.__name__)
+ tmpdir = f"{config.tmp_dir}/vpp-unittest-{cls.__name__}"
+ if config.wipe_tmp_dir:
+ shutil.rmtree(tmpdir, ignore_errors=True)
+ os.mkdir(tmpdir)
+ return tmpdir
+
+ @classmethod
+ def create_file_handler(cls):
+ if config.log_dir is None:
+ cls.file_handler = FileHandler(f"{cls.tempdir}/log.txt")
+ return
+
+ logdir = f"{config.log_dir}/vpp-unittest-{cls.__name__}"
+ if config.wipe_tmp_dir:
+ shutil.rmtree(logdir, ignore_errors=True)
+ os.mkdir(logdir)
+ cls.file_handler = FileHandler(f"{logdir}/log.txt")
@classmethod
def setUpClass(cls):
"""
super(VppTestCase, cls).setUpClass()
cls.logger = get_logger(cls.__name__)
- seed = os.environ["RND_SEED"]
- random.seed(seed)
+ random.seed(config.rnd_seed)
if hasattr(cls, 'parallel_handler'):
cls.logger.addHandler(cls.parallel_handler)
cls.logger.propagate = False
- d = os.getenv("DEBUG", None)
- cls.set_debug_flags(d)
+ cls.set_debug_flags(config.debug)
cls.tempdir = cls.get_tempdir()
- cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
+ cls.create_file_handler()
cls.file_handler.setFormatter(
Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
datefmt="%H:%M:%S"))
os.chdir(cls.tempdir)
cls.logger.info("Temporary dir is %s, api socket is %s",
cls.tempdir, cls.get_api_sock_path())
- cls.logger.debug("Random seed is %s", seed)
+ cls.logger.debug("Random seed is %s", config.rnd_seed)
cls.setUpConstants()
cls.reset_packet_infos()
cls._pcaps = []
cls.run_vpp()
cls.reporter.send_keep_alive(cls, 'setUpClass')
VppTestResult.current_test_case_info = TestCaseInfo(
- cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
+ cls.logger, cls.tempdir, cls.vpp.pid, config.vpp)
cls.vpp_stdout_deque = deque()
cls.vpp_stderr_deque = deque()
if not cls.debug_attach:
cls.quit()
cls.file_handler.close()
cls.reset_packet_infos()
- if debug_framework:
+ if config.debug_framework:
debug_internal.on_tear_down_class(cls)
def show_commands_at_teardown(self):
self.logger.info(self.vapi.ppcli("show bihash"))
self.logger.info("Logging testcase specific show commands.")
self.show_commands_at_teardown()
- self.registry.remove_vpp_config(self.logger)
+ if self.remove_configured_vpp_objects_on_tear_down:
+ self.registry.remove_vpp_config(self.logger)
# Save/Dump VPP api trace log
m = self._testMethodName
api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
vpp_api_trace_log))
os.rename(tmp_api_trace, vpp_api_trace_log)
- self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
- vpp_api_trace_log))
except VppTransportSocketIOError:
self.logger.debug("VppTransportSocketIOError: Vpp dead. "
"Cannot log show commands.")
def pg_start(cls, trace=True):
""" Enable the PG, wait till it is done, then clean up """
for (intf, worker) in cls._old_pcaps:
- intf.rename_old_pcap_file(intf.get_in_path(worker),
+ intf.handle_old_pcap_file(intf.get_in_path(worker),
intf.in_history_counter)
cls._old_pcaps = []
if trace:
cls._pcaps = []
@classmethod
- def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
+ def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0,
+ mode=None):
"""
Create packet-generator interfaces.
"""
result = []
for i in interfaces:
- intf = VppPGInterface(cls, i, gso, gso_size)
+ intf = VppPGInterface(cls, i, gso, gso_size, mode)
setattr(cls, intf.name, intf)
result.append(intf)
cls.pg_interfaces = result
return result
+ @classmethod
+ def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
+ pgmode = VppEnum.vl_api_pg_interface_mode_t
+ return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
+ pgmode.PG_API_MODE_IP4)
+
+ @classmethod
+ def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
+ pgmode = VppEnum.vl_api_pg_interface_mode_t
+ return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
+ pgmode.PG_API_MODE_IP6)
+
+ @classmethod
+ def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
+ pgmode = VppEnum.vl_api_pg_interface_mode_t
+ return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
+ pgmode.PG_API_MODE_ETHERNET)
+
+ @classmethod
+ def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
+ pgmode = VppEnum.vl_api_pg_interface_mode_t
+ return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
+ pgmode.PG_API_MODE_ETHERNET)
+
@classmethod
def create_loopback_interfaces(cls, count):
"""
:returns: string containing serialized data from packet info
"""
- return "%d %d %d %d %d" % (info.index, info.src, info.dst,
- info.ip, info.proto)
+
+ # retrieve payload, currently 18 bytes (4 x ints + 1 short)
+ return pack('iiiih', info.index, info.src,
+ info.dst, info.ip, info.proto)
@staticmethod
def payload_to_info(payload, payload_field='load'):
:returns: _PacketInfo object containing de-serialized data from payload
"""
- numbers = getattr(payload, payload_field).split()
+
+ # retrieve payload, currently 18 bytes (4 x ints + 1 short)
+ payload_b = getattr(payload, payload_field)[:18]
+
info = _PacketInfo()
- info.index = int(numbers[0])
- info.src = int(numbers[1])
- info.dst = int(numbers[2])
- info.ip = int(numbers[3])
- info.proto = int(numbers[4])
+ info.index, info.src, info.dst, info.ip, info.proto \
+ = unpack('iiiih', payload_b)
+
+ # some SRv6 TCs depend on get an exception if bad values are detected
+ if info.index > 0x4000:
+ raise ValueError('Index value is invalid')
+
return info
def get_next_packet_info(self, info):
if pkt.haslayer(ICMPv6EchoReply):
self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
- def get_packet_counter(self, counter):
+ def get_counter(self, counter):
if counter.startswith("/"):
counter_value = self.statistics.get_counter(counter)
else:
break
return counter_value
+ def assert_counter_equal(self, counter, expected_value,
+ thread=None, index=0):
+ c = self.get_counter(counter)
+ if thread is not None:
+ c = c[thread][index]
+ else:
+ c = sum(x[index] for x in c)
+ self.assert_equal(c, expected_value, "counter `%s'" % counter)
+
def assert_packet_counter_equal(self, counter, expected_value):
- counter_value = self.get_packet_counter(counter)
+ counter_value = self.get_counter(counter)
self.assert_equal(counter_value, expected_value,
"packet counter `%s'" % counter)
"Finished sleep (%s) - slept %es (wanted %es)",
remark, after - before, timeout)
+ def virtual_sleep(self, timeout, remark=None):
+ self.logger.debug("Moving VPP time by %s (%s)", timeout, remark)
+ self.vapi.cli("set clock adjust %s" % timeout)
+
def pg_send(self, intf, pkts, worker=None, trace=True):
intf.add_stream(pkts, worker=worker)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start(trace=trace)
- def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
+ def snapshot_stats(self, stats_diff):
+ """Return snapshot of interesting stats based on diff dictionary."""
+ stats_snapshot = {}
+ for sw_if_index in stats_diff:
+ for counter in stats_diff[sw_if_index]:
+ stats_snapshot[counter] = self.statistics[counter]
+ self.logger.debug(f"Took statistics stats_snapshot: {stats_snapshot}")
+ return stats_snapshot
+
+ def compare_stats_with_snapshot(self, stats_diff, stats_snapshot):
+ """Assert appropriate difference between current stats and snapshot."""
+ for sw_if_index in stats_diff:
+ for cntr, diff in stats_diff[sw_if_index].items():
+ if sw_if_index == "err":
+ self.assert_equal(
+ self.statistics[cntr].sum(),
+ stats_snapshot[cntr].sum() + diff,
+ f"'{cntr}' counter value (previous value: "
+ f"{stats_snapshot[cntr].sum()}, "
+ f"expected diff: {diff})")
+ else:
+ try:
+ self.assert_equal(
+ self.statistics[cntr][:, sw_if_index].sum(),
+ stats_snapshot[cntr][:, sw_if_index].sum() + diff,
+ f"'{cntr}' counter value (previous value: "
+ f"{stats_snapshot[cntr][:, sw_if_index].sum()}, "
+ f"expected diff: {diff})")
+ except IndexError:
+ # if diff is 0, then this most probably a case where
+ # test declares multiple interfaces but traffic hasn't
+ # passed through this one yet - which means the counter
+ # value is 0 and can be ignored
+ if 0 != diff:
+ raise
+
+ def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None,
+ stats_diff=None, trace=True, msg=None):
+ if stats_diff:
+ stats_snapshot = self.snapshot_stats(stats_diff)
+
self.pg_send(intf, pkts)
- if not timeout:
- timeout = 1
- for i in self.pg_interfaces:
- i.get_capture(0, timeout=timeout)
- i.assert_nothing_captured(remark=remark)
- timeout = 0.1
+
+ try:
+ if not timeout:
+ timeout = 1
+ for i in self.pg_interfaces:
+ i.assert_nothing_captured(timeout=timeout, remark=remark)
+ timeout = 0.1
+ finally:
+ if trace:
+ if msg:
+ self.logger.debug(f"send_and_assert_no_replies: {msg}")
+ self.logger.debug(self.vapi.cli("show trace"))
+
+ if stats_diff:
+ self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
def send_and_expect(self, intf, pkts, output, n_rx=None, worker=None,
- trace=True):
+ trace=True, msg=None, stats_diff=None):
+ if stats_diff:
+ stats_snapshot = self.snapshot_stats(stats_diff)
+
if not n_rx:
- n_rx = len(pkts)
+ n_rx = 1 if isinstance(pkts, Packet) else len(pkts)
self.pg_send(intf, pkts, worker=worker, trace=trace)
rx = output.get_capture(n_rx)
+ if trace:
+ if msg:
+ self.logger.debug(f"send_and_expect: {msg}")
+ self.logger.debug(self.vapi.cli("show trace"))
+
+ if stats_diff:
+ self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
+
return rx
- def send_and_expect_only(self, intf, pkts, output, timeout=None):
+ def send_and_expect_load_balancing(self, input, pkts, outputs,
+ worker=None, trace=True):
+ self.pg_send(input, pkts, worker=worker, trace=trace)
+ rxs = []
+ for oo in outputs:
+ rx = oo._get_capture(1)
+ self.assertNotEqual(0, len(rx))
+ rxs.append(rx)
+ if trace:
+ self.logger.debug(self.vapi.cli("show trace"))
+ return rxs
+
+ def send_and_expect_only(self, intf, pkts, output, timeout=None,
+ stats_diff=None):
+ if stats_diff:
+ stats_snapshot = self.snapshot_stats(stats_diff)
+
self.pg_send(intf, pkts)
rx = output.get_capture(len(pkts))
outputs = [output]
timeout = 1
for i in self.pg_interfaces:
if i not in outputs:
- i.get_capture(0, timeout=timeout)
- i.assert_nothing_captured()
+ i.assert_nothing_captured(timeout=timeout)
timeout = 0.1
+ if stats_diff:
+ self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
+
return rx
def symlink_failed(self):
if self.current_test_case_info:
try:
- failed_dir = os.getenv('FAILED_DIR')
+ failed_dir = config.failed_dir
link_path = os.path.join(
failed_dir,
'%s-FAILED' %
test_title = colorize(
f"FIXME with VPP workers: {test_title}", RED)
+ if test.has_tag(TestCaseTag.FIXME_ASAN):
+ test_title = colorize(
+ f"FIXME with ASAN: {test_title}", RED)
+ test.skip_fixme_asan()
+
if hasattr(test, 'vpp_worker_count'):
if test.vpp_worker_count == 0:
test_title += " [main thread only]"
self.result = os.EX_OSFILE
raise EnvironmentError(
"executable '%s' is not found or executable." % executable)
- self.logger.debug("Running executable: '{app}'"
- .format(app=' '.join(self.args)))
+ self.logger.debug("Running executable '{app}': '{cmd}'"
+ .format(app=self.app_name,
+ cmd=' '.join(self.args)))
env = os.environ.copy()
env.update(self.env)
env["CK_LOG_FILE_NAME"] = "-"
self.process = subprocess.Popen(
- self.args, shell=False, env=env, preexec_fn=os.setpgrp,
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ ['stdbuf', '-o0', '-e0'] + self.args, shell=False, env=env,
+ preexec_fn=os.setpgrp, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
self.wait_for_enter()
out, err = self.process.communicate()
self.logger.debug("Finished running `{app}'".format(app=self.app_name))