from traceback import format_exception
from logging import FileHandler, DEBUG, Formatter
from enum import Enum
+from abc import ABC, abstractmethod
+from struct import pack, unpack
import scapy.compat
from scapy.packet import Raw
from vpp_lo_interface import VppLoInterface
from vpp_bvi_interface import VppBviInterface
from vpp_papi_provider import VppPapiProvider
+from vpp_papi import VppEnum
import vpp_papi
from vpp_papi.vpp_stats import VPPStats
from vpp_papi.vpp_transport_socket import VppTransportSocketIOError
from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
from scapy.layers.inet6 import ICMPv6EchoReply
+from cpu_config import available_cpus, num_cpus, max_vpp_cpus
+
logger = logging.getLogger(__name__)
# Set up an empty logger for the testcase that can be overridden as necessary
ERROR = 2
SKIP = 3
TEST_RUN = 4
+SKIP_CPU_SHORTAGE = 5
class BoolEnvironmentVariable(object):
running_gcov_tests = _running_gcov_tests()
+def get_environ_vpp_worker_count():
+ worker_config = os.getenv("VPP_WORKER_CONFIG", None)
+ if worker_config:
+ elems = worker_config.split(" ")
+ if elems[0] != "workers" or len(elems) != 2:
+ raise ValueError("Wrong VPP_WORKER_CONFIG == '%s' value." %
+ worker_config)
+ return int(elems[1])
+ else:
+ return 0
+
+
+environ_vpp_worker_count = get_environ_vpp_worker_count()
+
+
class KeepAliveReporter(object):
"""
Singleton object which reports test start to parent process
pass
-class VppTestCase(unittest.TestCase):
+class CPUInterface(ABC):
+ cpus = []
+ skipped_due_to_cpu_lack = False
+
+ @classmethod
+ @abstractmethod
+ def get_cpus_required(cls):
+ pass
+
+ @classmethod
+ def assign_cpus(cls, cpus):
+ cls.cpus = cpus
+
+
+class VppTestCase(CPUInterface, unittest.TestCase):
"""This subclass is a base class for VPP test cases that are implemented as
classes. It provides methods to create and run test case.
"""
+ extra_vpp_statseg_config = ""
extra_vpp_punt_config = []
extra_vpp_plugin_config = []
logger = null_logger
if dl == "gdb-all" or dl == "gdbserver-all":
cls.debug_all = True
- @staticmethod
- def get_least_used_cpu():
- cpu_usage_list = [set(range(psutil.cpu_count()))]
- vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
- if 'vpp_main' == p.info['name']]
- for vpp_process in vpp_processes:
- for cpu_usage_set in cpu_usage_list:
- try:
- cpu_num = vpp_process.cpu_num()
- if cpu_num in cpu_usage_set:
- cpu_usage_set_index = cpu_usage_list.index(
- cpu_usage_set)
- if cpu_usage_set_index == len(cpu_usage_list) - 1:
- cpu_usage_list.append({cpu_num})
- else:
- cpu_usage_list[cpu_usage_set_index + 1].add(
- cpu_num)
- cpu_usage_set.remove(cpu_num)
- break
- except psutil.NoSuchProcess:
- pass
-
- for cpu_usage_set in cpu_usage_list:
- if len(cpu_usage_set) > 0:
- min_usage_set = cpu_usage_set
- break
+ @classmethod
+ def get_vpp_worker_count(cls):
+ if not hasattr(cls, "vpp_worker_count"):
+ if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
+ cls.vpp_worker_count = 0
+ else:
+ cls.vpp_worker_count = environ_vpp_worker_count
+ return cls.vpp_worker_count
- return random.choice(tuple(min_usage_set))
+ @classmethod
+ def get_cpus_required(cls):
+ return 1 + cls.get_vpp_worker_count()
@classmethod
def setUpConstants(cls):
c = os.getenv("CACHE_OUTPUT", "1")
cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
- cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
- cls.test_plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
- cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
- plugin_path = None
- if cls.plugin_path is not None:
- if cls.extern_plugin_path is not None:
- plugin_path = "%s:%s" % (
- cls.plugin_path, cls.extern_plugin_path)
- else:
- plugin_path = cls.plugin_path
- elif cls.extern_plugin_path is not None:
- plugin_path = cls.extern_plugin_path
+ extern_plugin_path = os.getenv('EXTERN_PLUGINS')
debug_cli = ""
if cls.step or cls.debug_gdb or cls.debug_gdbserver:
debug_cli = "cli-listen localhost:5002"
if coredump_size is None:
coredump_size = "coredump-size unlimited"
- cpu_core_number = cls.get_least_used_cpu()
- if not hasattr(cls, "vpp_worker_count"):
- cls.vpp_worker_count = 0
- worker_config = os.getenv("VPP_WORKER_CONFIG", "")
- if worker_config:
- elems = worker_config.split(" ")
- if elems[0] != "workers" or len(elems) != 2:
- raise ValueError("Wrong VPP_WORKER_CONFIG == '%s' value." %
- worker_config)
- cls.vpp_worker_count = int(elems[1])
- if cls.vpp_worker_count > 0 and\
- cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
- cls.vpp_worker_count = 0
-
default_variant = os.getenv("VARIANT")
if default_variant is not None:
default_variant = "defaults { %s 100 }" % default_variant
coredump_size, "runtime-dir", cls.tempdir, "}",
"api-trace", "{", "on", "}",
"api-segment", "{", "prefix", cls.get_api_segment_prefix(), "}",
- "cpu", "{", "main-core", str(cpu_core_number), ]
- if cls.vpp_worker_count:
- cls.vpp_cmdline.extend(["workers", str(cls.vpp_worker_count)])
+ "cpu", "{", "main-core", str(cls.cpus[0]), ]
+ if extern_plugin_path is not None:
+ cls.extra_vpp_plugin_config.append(
+ "add-path %s" % extern_plugin_path)
+ if cls.get_vpp_worker_count():
+ cls.vpp_cmdline.extend([
+ "corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])])
cls.vpp_cmdline.extend([
"}",
"physmem", "{", "max-size", "32m", "}",
- "statseg", "{", "socket-name", cls.get_stats_sock_path(), "}",
+ "statseg", "{", "socket-name", cls.get_stats_sock_path(),
+ cls.extra_vpp_statseg_config, "}",
"socksvr", "{", "socket-name", cls.get_api_sock_path(), "}",
"node { ", default_variant, "}",
"api-fuzz {", api_fuzzing, "}",
if cls.extra_vpp_punt_config is not None:
cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
- if plugin_path is not None:
- cls.vpp_cmdline.extend(["plugin_path", plugin_path])
- if cls.test_plugin_path is not None:
- cls.vpp_cmdline.extend(["test_plugin_path", cls.test_plugin_path])
if not cls.debug_attach:
cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
@classmethod
def run_vpp(cls):
+ cls.logger.debug(f"Assigned cpus: {cls.cpus}")
cmdline = cls.vpp_cmdline
if cls.debug_gdbserver:
gdbserver = '/usr/bin/gdbserver'
- if not os.path.isfile(gdbserver) or \
+ if not os.path.isfile(gdbserver) or\
not os.access(gdbserver, os.X_OK):
raise Exception("gdbserver binary '%s' does not exist or is "
"not executable" % gdbserver)
self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
vpp_api_trace_log))
os.rename(tmp_api_trace, vpp_api_trace_log)
- self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
- vpp_api_trace_log))
except VppTransportSocketIOError:
self.logger.debug("VppTransportSocketIOError: Vpp dead. "
"Cannot log show commands.")
cls._pcaps = []
@classmethod
- def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
+ def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0,
+ mode=None):
"""
Create packet-generator interfaces.
"""
result = []
for i in interfaces:
- intf = VppPGInterface(cls, i, gso, gso_size)
+ intf = VppPGInterface(cls, i, gso, gso_size, mode)
setattr(cls, intf.name, intf)
result.append(intf)
cls.pg_interfaces = result
return result
+ @classmethod
+ def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
+ pgmode = VppEnum.vl_api_pg_interface_mode_t
+ return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
+ pgmode.PG_API_MODE_IP4)
+
+ @classmethod
+ def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
+ pgmode = VppEnum.vl_api_pg_interface_mode_t
+ return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
+ pgmode.PG_API_MODE_IP6)
+
+ @classmethod
+ def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
+ pgmode = VppEnum.vl_api_pg_interface_mode_t
+ return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
+ pgmode.PG_API_MODE_ETHERNET)
+
+ @classmethod
+ def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
+ pgmode = VppEnum.vl_api_pg_interface_mode_t
+ return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
+ pgmode.PG_API_MODE_ETHERNET)
+
@classmethod
def create_loopback_interfaces(cls, count):
"""
:returns: string containing serialized data from packet info
"""
- return "%d %d %d %d %d" % (info.index, info.src, info.dst,
- info.ip, info.proto)
+
+ # retrieve payload, currently 18 bytes (4 x ints + 1 short)
+ return pack('iiiih', info.index, info.src,
+ info.dst, info.ip, info.proto)
@staticmethod
def payload_to_info(payload, payload_field='load'):
:returns: _PacketInfo object containing de-serialized data from payload
"""
- numbers = getattr(payload, payload_field).split()
+
+ # retrieve payload, currently 18 bytes (4 x ints + 1 short)
+ payload_b = getattr(payload, payload_field)[:18]
+
info = _PacketInfo()
- info.index = int(numbers[0])
- info.src = int(numbers[1])
- info.dst = int(numbers[2])
- info.ip = int(numbers[3])
- info.proto = int(numbers[4])
+ info.index, info.src, info.dst, info.ip, info.proto \
+ = unpack('iiiih', payload_b)
+
+ # some SRv6 TCs depend on get an exception if bad values are detected
+ if info.index > 0x4000:
+ raise ValueError('Index value is invalid')
+
return info
def get_next_packet_info(self, info):
self.verbosity = verbosity
self.result_string = None
self.runner = runner
+ self.printed = []
def addSuccess(self, test):
"""
unittest.TestResult.addSkip(self, test, reason)
self.result_string = colorize("SKIP", YELLOW)
- self.send_result_through_pipe(test, SKIP)
+ if reason == "not enough cpus":
+ self.send_result_through_pipe(test, SKIP_CPU_SHORTAGE)
+ else:
+ self.send_result_through_pipe(test, SKIP)
def symlink_failed(self):
if self.current_test_case_info:
"""
def print_header(test):
+ if test.__class__ in self.printed:
+ return
+
test_doc = getdoc(test)
if not test_doc:
raise Exception("No doc string for test '%s'" % test.id())
- test_title = test_doc.splitlines()[0]
- test_title_colored = colorize(test_title, GREEN)
+
+ test_title = test_doc.splitlines()[0].rstrip()
+ test_title = colorize(test_title, GREEN)
if test.is_tagged_run_solo():
- # long live PEP-8 and 80 char width limitation...
- c = YELLOW
- test_title_colored = colorize("SOLO RUN: " + test_title, c)
+ test_title = colorize(f"SOLO RUN: {test_title}", YELLOW)
# This block may overwrite the colorized title above,
# but we want this to stand out and be fixed
if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
- c = RED
- w = "FIXME with VPP workers: "
- test_title_colored = colorize(w + test_title, c)
+ test_title = colorize(
+ f"FIXME with VPP workers: {test_title}", RED)
+
+ if hasattr(test, 'vpp_worker_count'):
+ if test.vpp_worker_count == 0:
+ test_title += " [main thread only]"
+ elif test.vpp_worker_count == 1:
+ test_title += " [1 worker thread]"
+ else:
+ test_title += f" [{test.vpp_worker_count} worker threads]"
+
+ if test.__class__.skipped_due_to_cpu_lack:
+ test_title = colorize(
+ f"{test_title} [skipped - not enough cpus, "
+ f"required={test.__class__.get_cpus_required()}, "
+ f"available={max_vpp_cpus}]", YELLOW)
- if not hasattr(test.__class__, '_header_printed'):
- print(double_line_delim)
- print(test_title_colored)
- print(double_line_delim)
- test.__class__._header_printed = True
+ print(double_line_delim)
+ print(test_title)
+ print(double_line_delim)
+ self.printed.append(test.__class__)
print_header(test)
self.start_test = time.time()
self.result = os.EX_OSFILE
raise EnvironmentError(
"executable '%s' is not found or executable." % executable)
- self.logger.debug("Running executable: '{app}'"
- .format(app=' '.join(self.args)))
+ self.logger.debug("Running executable '{app}': '{cmd}'"
+ .format(app=self.app_name,
+ cmd=' '.join(self.args)))
env = os.environ.copy()
env.update(self.env)
env["CK_LOG_FILE_NAME"] = "-"
self.process = subprocess.Popen(
- self.args, shell=False, env=env, preexec_fn=os.setpgrp,
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ ['stdbuf', '-o0', '-e0'] + self.args, shell=False, env=env,
+ preexec_fn=os.setpgrp, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
self.wait_for_enter()
out, err = self.process.communicate()
self.logger.debug("Finished running `{app}'".format(app=self.app_name))