from inspect import getdoc, isclass
from traceback import format_exception
from logging import FileHandler, DEBUG, Formatter
+
+import scapy.compat
from scapy.packet import Raw
from hook import StepHook, PollHook, VppDiedError
from vpp_pg_interface import VppPGInterface
stderr=subprocess.PIPE,
bufsize=1)
except subprocess.CalledProcessError as e:
- cls.logger.critical("Couldn't start vpp: %s" % e)
+ cls.logger.critical("Subprocess returned with non-0 return code: ("
+ "%s)", e.returncode)
+ raise
+ except OSError as e:
+ cls.logger.critical("Subprocess returned with OS error: "
+ "(%s) %s", e.errno, e.strerror)
+ raise
+ except Exception as e:
+ cls.logger.exception("Subprocess returned unexpected from "
+ "%s:", cmdline)
raise
cls.wait_for_enter()
if os.path.exists(cls.stats_sock):
ok = True
break
- time.sleep(0.8)
+ cls.sleep(0.8)
if not ok:
cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
Perform class setup before running the testcase
Remove shared memory files, start vpp and connect the vpp-api
"""
+ super(VppTestCase, cls).setUpClass()
gc.collect() # run garbage collection first
random.seed()
cls.logger = get_logger(cls.__name__)
if hasattr(cls, 'vpp'):
if hasattr(cls, 'vapi'):
+ cls.logger.debug("Disconnecting class vapi client on %s",
+ cls.__name__)
cls.vapi.disconnect()
+ cls.logger.debug("Deleting class vapi attribute on %s",
+ cls.__name__)
del cls.vapi
cls.vpp.poll()
if cls.vpp.returncode is None:
cls.vpp.kill()
cls.logger.debug("Waiting for vpp to die")
cls.vpp.communicate()
+ cls.logger.debug("Deleting class vpp attribute on %s",
+ cls.__name__)
del cls.vpp
if cls.vpp_startup_failed:
stderr_log(single_line_delim)
stderr_log('VPP output to stderr while running %s:', cls.__name__)
stderr_log(single_line_delim)
- vpp_output = "".join(str(cls.vpp_stderr_deque))
+ vpp_output = "".join(cls.vpp_stderr_deque)
with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
f.write(vpp_output)
stderr_log('\n%s', vpp_output)
def setUp(self):
""" Clear trace before running each test"""
+ super(VppTestCase, self).setUp()
self.reporter.send_keep_alive(self)
self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
(self.__class__.__name__, self._testMethodName,
info.ip, info.proto)
@staticmethod
- def payload_to_info(payload):
+ def payload_to_info(payload, payload_field='load'):
"""
Convert packet payload to _PacketInfo object
:param payload: packet payload
-
+ :type: <class 'scapy.packet.Raw'>
+ :param: payload_field: packet fieldname of payload "load" for
+ <class 'scapy.packet.Raw'>
:returns: _PacketInfo object containing de-serialized data from payload
"""
- numbers = payload.split()
+ numbers = getattr(payload, payload_field).split()
info = _PacketInfo()
info.index = int(numbers[0])
info.src = int(numbers[1])
def assert_packet_checksums_valid(self, packet,
ignore_zero_udp_checksums=True):
- received = packet.__class__(str(packet))
+ received = packet.__class__(scapy.compat.raw(packet))
self.logger.debug(
ppp("Verifying packet checksums for packet:", received))
udp_layers = ['UDP', 'UDPerror']
checksum_fields = ['cksum', 'chksum']
checksums = []
counter = 0
- temp = received.__class__(str(received))
+ temp = received.__class__(scapy.compat.raw(received))
while True:
layer = temp.getlayer(counter)
if layer:
counter = counter + 1
if 0 == len(checksums):
return
- temp = temp.__class__(str(temp))
+ temp = temp.__class__(scapy.compat.raw(temp))
for layer, cf in checksums:
calc_sum = getattr(temp[layer], cf)
self.assert_equal(
received_packet_checksum = getattr(received_packet[layer], field_name)
if ignore_zero_checksum and 0 == received_packet_checksum:
return
- recalculated = received_packet.__class__(str(received_packet))
+ recalculated = received_packet.__class__(
+ scapy.compat.raw(received_packet))
delattr(recalculated[layer], field_name)
- recalculated = recalculated.__class__(str(recalculated))
+ recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
self.assert_equal(received_packet_checksum,
getattr(recalculated[layer], field_name),
"packet checksum on layer: %s" % layer)
@classmethod
def sleep(cls, timeout, remark=None):
+
+ # /* Allow sleep(0) to maintain win32 semantics, and as decreed
+ # * by Guido, only the main thread can be interrupted.
+ # */
+ # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
+ if timeout == 0:
+ # yield quantum
+ if hasattr(os, 'sched_yield'):
+ os.sched_yield()
+ else:
+ time.sleep(0)
+ return
+
if hasattr(cls, 'logger'):
cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
before = time.time()
time.sleep(timeout)
after = time.time()
if hasattr(cls, 'logger') and after - before > 2 * timeout:
- cls.logger.error("unexpected time.sleep() result - "
+ cls.logger.error("unexpected self.sleep() result - "
"slept for %es instead of ~%es!",
after - before, timeout)
if hasattr(cls, 'logger'):
"Finished sleep (%s) - slept %es (wanted %es)",
remark, after - before, timeout)
- def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
+ def pg_send(self, intf, pkts):
self.vapi.cli("clear trace")
intf.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
+
+ def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
+ self.pg_send(intf, pkts)
if not timeout:
timeout = 1
for i in self.pg_interfaces:
i.assert_nothing_captured(remark=remark)
timeout = 0.1
- def send_and_expect(self, input, pkts, output):
- self.vapi.cli("clear trace")
- input.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
+ def send_and_expect(self, intf, pkts, output):
+ self.pg_send(intf, pkts)
rx = output.get_capture(len(pkts))
return rx
- def send_and_expect_only(self, input, pkts, output, timeout=None):
- self.vapi.cli("clear trace")
- input.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
+ def send_and_expect_only(self, intf, pkts, output, timeout=None):
+ self.pg_send(intf, pkts)
rx = output.get_capture(len(pkts))
outputs = [output]
if not timeout:
core_crash_test_cases_info = set()
current_test_case_info = None
- def __init__(self, stream, descriptions, verbosity, runner):
+ def __init__(self, stream=None, descriptions=None, verbosity=None,
+ runner=None):
"""
:param stream File descriptor to store where to report test results.
Set to the standard error stream by default.
if isinstance(test, unittest.suite._ErrorHolder):
test_name = str(test)
else:
- test_name = "'{}' ({})".format(
+ test_name = "'{!s}' ({!s})".format(
get_testcase_doc_name(test), test.id())
self.current_test_case_info.core_crash_test = test_name
self.core_crash_test_cases_info.add(
def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
result_pipe=None, failfast=False, buffer=False,
- resultclass=None, print_summary=True):
+ resultclass=None, print_summary=True, **kwargs):
# ignore stream setting here, use hard-coded stdout to be in sync
# with prints from VppTestCase methods ...
super(VppTestRunner, self).__init__(sys.stdout, descriptions,
verbosity, failfast, buffer,
- resultclass)
+ resultclass, **kwargs)
KeepAliveReporter.pipe = keep_alive_pipe
self.orig_stream = self.stream