stderr=subprocess.PIPE,
bufsize=1)
except subprocess.CalledProcessError as e:
- cls.logger.critical("Couldn't start vpp: %s" % e)
+ cls.logger.critical("Subprocess returned with non-0 return code: ("
+ "%s)", e.returncode)
+ raise
+ except OSError as e:
+ cls.logger.critical("Subprocess returned with OS error: "
+ "(%s) %s", e.errno, e.strerror)
+ raise
+ except Exception as e:
+ cls.logger.exception("Subprocess returned unexpected from "
+ "%s:", cmdline)
raise
cls.wait_for_enter()
info.ip, info.proto)
@staticmethod
- def payload_to_info(payload):
+ def payload_to_info(payload, payload_field='load'):
"""
Convert packet payload to _PacketInfo object
:param payload: packet payload
-
+ :type: <class 'scapy.packet.Raw'>
+ :param: payload_field: packet fieldname of payload "load" for
+ <class 'scapy.packet.Raw'>
:returns: _PacketInfo object containing de-serialized data from payload
"""
- numbers = payload.split()
+ numbers = getattr(payload, payload_field).split()
info = _PacketInfo()
info.index = int(numbers[0])
info.src = int(numbers[1])
"Finished sleep (%s) - slept %es (wanted %es)",
remark, after - before, timeout)
- def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
+ def pg_send(self, intf, pkts):
self.vapi.cli("clear trace")
intf.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
+
+ def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
+ self.pg_send(intf, pkts)
if not timeout:
timeout = 1
for i in self.pg_interfaces:
i.assert_nothing_captured(remark=remark)
timeout = 0.1
- def send_and_expect(self, input, pkts, output):
- self.vapi.cli("clear trace")
- input.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
+ def send_and_expect(self, intf, pkts, output):
+ self.pg_send(intf, pkts)
rx = output.get_capture(len(pkts))
return rx
- def send_and_expect_only(self, input, pkts, output, timeout=None):
- self.vapi.cli("clear trace")
- input.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
+ def send_and_expect_only(self, intf, pkts, output, timeout=None):
+ self.pg_send(intf, pkts)
rx = output.get_capture(len(pkts))
outputs = [output]
if not timeout:
if isinstance(test, unittest.suite._ErrorHolder):
test_name = str(test)
else:
- test_name = "'{}' ({})".format(
+ test_name = "'{!s}' ({!s})".format(
get_testcase_doc_name(test), test.id())
self.current_test_case_info.core_crash_test = test_name
self.core_crash_test_cases_info.add(