from __future__ import print_function
import gc
+import logging
import sys
import os
import select
except NameError:
pass
+logger = logging.getLogger(__name__)
+
+# Set up an empty logger for the testcase that can be overridden as necessary
+null_logger = logging.getLogger('VppTestCase')
+null_logger.addHandler(logging.NullHandler())
+
PASS = 0
FAIL = 1
ERROR = 2
running_extended_tests = _running_extended_tests()
+def _running_gcov_tests():
+ return BoolEnvironmentVariable("GCOV_TESTS")
+
+
+running_gcov_tests = _running_gcov_tests()
+
+
def _running_on_centos():
os_id = os.getenv("OS_ID", "")
return True if "centos" in os_id.lower() else False
extra_vpp_punt_config = []
extra_vpp_plugin_config = []
+ logger = null_logger
vapi_response_timeout = 5
@property
else:
return 0
+ @classmethod
+ def force_solo(cls):
+ """ if the test case class is timing-sensitive - return true """
+ return False
+
@classmethod
def instance(cls):
"""Return the instance of this testcase"""
if not hasattr(cls, "worker_config"):
cls.worker_config = ""
+ default_variant = os.getenv("VARIANT")
+ if default_variant is not None:
+ default_variant = "defaults { %s 100 }" % default_variant
+ else:
+ default_variant = ""
+
+ api_fuzzing = os.getenv("API_FUZZ")
+ if api_fuzzing is None:
+ api_fuzzing = 'off'
+
cls.vpp_cmdline = [cls.vpp_bin, "unix",
"{", "nodaemon", debug_cli, "full-coredump",
coredump_size, "runtime-dir", cls.tempdir, "}",
"prefix", cls.shm_prefix, "}", "cpu", "{",
"main-core", str(cpu_core_number),
cls.worker_config, "}",
+ "physmem", "{", "max-size", "32m", "}",
"statseg", "{", "socket-name", cls.stats_sock, "}",
"socksvr", "{", "socket-name", cls.api_sock, "}",
+ "node { ", default_variant, "}",
+ "api-fuzz {", api_fuzzing, "}",
"plugins",
"{", "plugin", "dpdk_plugin.so", "{", "disable",
"}", "plugin", "rdma_plugin.so", "{", "disable",
"}", "plugin", "unittest_plugin.so", "{", "enable",
"}"] + cls.extra_vpp_plugin_config + ["}", ]
+
if cls.extra_vpp_punt_config is not None:
cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
if plugin_path is not None:
try:
cls.vpp = subprocess.Popen(cmdline,
stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- bufsize=1)
+ stderr=subprocess.PIPE)
except subprocess.CalledProcessError as e:
cls.logger.critical("Subprocess returned with non-0 return code: ("
"%s)", e.returncode)
"VPP-API connection failed, did you forget "
"to 'continue' VPP from within gdb?", RED))
raise
+ except vpp_papi.VPPRuntimeError as e:
+ cls.logger.debug("%s" % e)
+ cls.quit()
+ raise
except Exception as e:
cls.logger.debug("Exception connecting to VPP: %s" % e)
-
cls.quit()
raise
@classmethod
def get_vpp_time(cls):
- return float(cls.vapi.cli('show clock').replace("Time now ", ""))
+ # processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT"
+ # returns float("2.190522")
+ timestr = cls.vapi.cli('show clock')
+ head, sep, tail = timestr.partition(',')
+ head, sep, tail = head.partition('Time now')
+ return float(tail)
@classmethod
def sleep_on_vpp_time(cls, sec):
time.sleep(0)
return
- if hasattr(cls, 'logger'):
- cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
+ cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
before = time.time()
time.sleep(timeout)
after = time.time()
- if hasattr(cls, 'logger') and after - before > 2 * timeout:
+ if after - before > 2 * timeout:
cls.logger.error("unexpected self.sleep() result - "
"slept for %es instead of ~%es!",
after - before, timeout)
- if hasattr(cls, 'logger'):
- cls.logger.debug(
+
+ cls.logger.debug(
"Finished sleep (%s) - slept %es (wanted %es)",
remark, after - before, timeout)
failed_dir,
'%s-FAILED' %
os.path.basename(self.current_test_case_info.tempdir))
- if self.current_test_case_info.logger:
- self.current_test_case_info.logger.debug(
+
+ self.current_test_case_info.logger.debug(
"creating a link to the failed test")
- self.current_test_case_info.logger.debug(
+ self.current_test_case_info.logger.debug(
"os.symlink(%s, %s)" %
(self.current_test_case_info.tempdir, link_path))
if os.path.exists(link_path):
- if self.current_test_case_info.logger:
- self.current_test_case_info.logger.debug(
+ self.current_test_case_info.logger.debug(
'symlink already exists')
else:
os.symlink(self.current_test_case_info.tempdir, link_path)
except Exception as e:
- if self.current_test_case_info.logger:
- self.current_test_case_info.logger.error(e)
+ self.current_test_case_info.logger.error(e)
def send_result_through_pipe(self, test, result):
if hasattr(self, 'test_framework_result_pipe'):
"""
def print_header(test):
+ test_doc = getdoc(test)
+ if not test_doc:
+ raise Exception("No doc string for test '%s'" % test.id())
+ test_title = test_doc.splitlines()[0]
+ test_title_colored = colorize(test_title, GREEN)
+ if test.force_solo():
+ # long live PEP-8 and 80 char width limitation...
+ c = YELLOW
+ test_title_colored = colorize("SOLO RUN: " + test_title, c)
+
if not hasattr(test.__class__, '_header_printed'):
print(double_line_delim)
- print(colorize(getdoc(test).splitlines()[0], GREEN))
+ print(test_title_colored)
print(double_line_delim)
test.__class__._header_printed = True
class Worker(Thread):
- def __init__(self, args, logger, env=None):
+ def __init__(self, executable_args, logger, env=None, *args, **kwargs):
+ super(Worker, self).__init__(*args, **kwargs)
self.logger = logger
- self.args = args
+ self.args = executable_args
if hasattr(self, 'testcase') and self.testcase.debug_all:
if self.testcase.debug_gdbserver:
self.args = ['/usr/bin/gdbserver', 'localhost:{port}'
.format(port=self.testcase.gdbserver_port)] + args
elif self.testcase.debug_gdb and hasattr(self, 'wait_for_gdb'):
self.args.append(self.wait_for_gdb)
- self.app_bin = args[0]
+ self.app_bin = executable_args[0]
self.app_name = os.path.basename(self.app_bin)
if hasattr(self, 'role'):
self.app_name += ' {role}'.format(role=self.role)
self.result = None
env = {} if env is None else env
self.env = copy.deepcopy(env)
- super(Worker, self).__init__()
def wait_for_enter(self):
if not hasattr(self, 'testcase'):