import unittest
import tempfile
import time
-import resource
import faulthandler
+import random
+import copy
from collections import deque
from threading import Thread, Event
from inspect import getdoc, isclass
from vpp_sub_interface import VppSubInterface
from vpp_lo_interface import VppLoInterface
from vpp_papi_provider import VppPapiProvider
-from log import *
+from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
+ getLogger, colorize
from vpp_object import VppObjectRegistry
if os.name == 'posix' and sys.version_info[0] < 3:
# using subprocess32 is recommended by python official documentation
else:
import subprocess
+debug_framework = False
+if os.getenv('TEST_DEBUG', "0") == "1":
+ debug_framework = True
+ import debug_internal
+
+
"""
Test framework module.
def pump_output(testclass):
""" pump output from vpp stdout/stderr to proper queues """
+ stdout_fragment = ""
+ stderr_fragment = ""
while not testclass.pump_thread_stop_flag.wait(0):
readable = select.select([testclass.vpp.stdout.fileno(),
testclass.vpp.stderr.fileno(),
testclass.pump_thread_wakeup_pipe[0]],
[], [])[0]
if testclass.vpp.stdout.fileno() in readable:
- read = os.read(testclass.vpp.stdout.fileno(), 1024)
- testclass.vpp_stdout_deque.append(read)
- if not testclass.cache_vpp_output:
- for line in read.splitlines():
- testclass.logger.debug("VPP STDOUT: %s" % line)
+ read = os.read(testclass.vpp.stdout.fileno(), 102400)
+ if len(read) > 0:
+ split = read.splitlines(True)
+ if len(stdout_fragment) > 0:
+ split[0] = "%s%s" % (stdout_fragment, split[0])
+ if len(split) > 0 and split[-1].endswith("\n"):
+ limit = None
+ else:
+ limit = -1
+ stdout_fragment = split[-1]
+ testclass.vpp_stdout_deque.extend(split[:limit])
+ if not testclass.cache_vpp_output:
+ for line in split[:limit]:
+ testclass.logger.debug(
+ "VPP STDOUT: %s" % line.rstrip("\n"))
if testclass.vpp.stderr.fileno() in readable:
- read = os.read(testclass.vpp.stderr.fileno(), 1024)
- testclass.vpp_stderr_deque.append(read)
- if not testclass.cache_vpp_output:
- for line in read.splitlines():
- testclass.logger.debug("VPP STDERR: %s" % line)
+ read = os.read(testclass.vpp.stderr.fileno(), 102400)
+ if len(read) > 0:
+ split = read.splitlines(True)
+ if len(stderr_fragment) > 0:
+ split[0] = "%s%s" % (stderr_fragment, split[0])
+ if len(split) > 0 and split[-1].endswith("\n"):
+ limit = None
+ else:
+ limit = -1
+ stderr_fragment = split[-1]
+ testclass.vpp_stderr_deque.extend(split[:limit])
+ if not testclass.cache_vpp_output:
+ for line in split[:limit]:
+ testclass.logger.debug(
+ "VPP STDERR: %s" % line.rstrip("\n"))
# ignoring the dummy pipe here intentionally - the flag will take care
# of properly terminating the loop
try:
c = os.getenv("CACHE_OUTPUT", "1")
cls.cache_vpp_output = \
- True if c.lower() in ("y", "yes", "1") else False
+ False if c.lower() in ("n", "no", "0") else True
except:
cls.cache_vpp_output = True
cls.set_debug_flags(d)
coredump_size, "}", "api-trace", "{", "on", "}",
"api-segment", "{", "prefix", cls.shm_prefix, "}",
"plugins", "{", "plugin", "dpdk_plugin.so", "{",
- "disable", "}", "}"]
+ "disable", "}", "}", ]
if plugin_path is not None:
cls.vpp_cmdline.extend(["plugin_path", plugin_path])
cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
Remove shared memory files, start vpp and connect the vpp-api
"""
gc.collect() # run garbage collection first
+ random.seed()
cls.logger = getLogger(cls.__name__)
cls.tempdir = tempfile.mkdtemp(
prefix='vpp-unittest-%s-' % cls.__name__)
try:
cls.vapi.connect()
except:
+ try:
+ cls.vapi.disconnect()
+ except:
+ pass
if cls.debug_gdbserver:
print(colorize("You're running VPP inside gdbserver but "
"VPP-API connection failed, did you forget "
cls.quit()
except:
pass
- raise t, v, tb
+ raise (t, v, tb)
@classmethod
def quit(cls):
""" Perform final cleanup after running all tests in this test-case """
cls.quit()
cls.file_handler.close()
+ cls.reset_packet_infos()
+ if debug_framework:
+ debug_internal.on_tear_down_class(cls)
def tearDown(self):
""" Show various debug prints after each test """
self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
vpp_api_trace_log))
os.rename(tmp_api_trace, vpp_api_trace_log)
- self.logger.info(self.vapi.ppcli("api trace dump %s" %
+ self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
vpp_api_trace_log))
else:
self.registry.unregister_all(self.logger)
type(self).test_instance = self
@classmethod
- def pg_enable_capture(cls, interfaces):
+ def pg_enable_capture(cls, interfaces=None):
"""
Enable capture on packet-generator interfaces
- :param interfaces: iterable interface indexes
+ :param interfaces: iterable interface indexes (if None,
+ use self.pg_interfaces)
"""
+ if interfaces is None:
+ interfaces = cls.pg_interfaces
for i in interfaces:
i.enable_capture()
return result
@staticmethod
- def extend_packet(packet, size):
+ def extend_packet(packet, size, padding=' '):
"""
- Extend packet to given size by padding with spaces
+ Extend packet to given size by padding with spaces or custom padding
NOTE: Currently works only when Raw layer is present.
:param packet: packet
:param size: target size
+ :param padding: padding used to extend the payload
"""
packet_len = len(packet) + 4
extend = size - packet_len
if extend > 0:
- packet[Raw].load += ' ' * extend
+ num = (extend / len(padding)) + 1
+ packet[Raw].load += (padding * num)[:extend]
@classmethod
def reset_packet_infos(cls):
"Finished sleep (%s) - slept %ss (wanted %ss)" % (
remark, after - before, timeout))
+ def send_and_assert_no_replies(self, intf, pkts, remark=""):
+ self.vapi.cli("clear trace")
+ intf.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ timeout = 1
+ for i in self.pg_interfaces:
+ i.get_capture(0, timeout=timeout)
+ i.assert_nothing_captured(remark=remark)
+ timeout = 0.1
+
+ def send_and_expect(self, input, pkts, output):
+ self.vapi.cli("clear trace")
+ input.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ rx = output.get_capture(len(pkts))
+ return rx
+
class TestCasePrinter(object):
_shared_state = {}
for t in tests:
if isinstance(t, unittest.suite.TestSuite):
# this is a bunch of tests, recursively filter...
- x = filter_tests(t, filter_cb)
+ x = VppTestRunner.filter_tests(t, filter_cb)
if x.countTestCases() > 0:
result.addTest(x)
elif isinstance(t, unittest.TestCase):
class Worker(Thread):
- def __init__(self, args, logger):
+ def __init__(self, args, logger, env={}):
self.logger = logger
self.args = args
self.result = None
+ self.env = copy.deepcopy(env)
super(Worker, self).__init__()
def run(self):
executable = self.args[0]
self.logger.debug("Running executable w/args `%s'" % self.args)
env = os.environ.copy()
+ env.update(self.env)
env["CK_LOG_FILE_NAME"] = "-"
self.process = subprocess.Popen(
self.args, shell=False, env=env, preexec_fn=os.setpgrp,
self.logger.info(single_line_delim)
self.logger.info("Executable `%s' wrote to stderr:" % executable)
self.logger.info(single_line_delim)
- self.logger.error(err)
+ self.logger.info(err)
self.logger.info(single_line_delim)
self.result = self.process.returncode