from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
from scapy.layers.inet6 import ICMPv6EchoReply
from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
from scapy.layers.inet6 import ICMPv6EchoReply
if os.name == 'posix' and sys.version_info[0] < 3:
# using subprocess32 is recommended by python official documentation
# @ https://docs.python.org/2/library/subprocess.html
if os.name == 'posix' and sys.version_info[0] < 3:
# using subprocess32 is recommended by python official documentation
# @ https://docs.python.org/2/library/subprocess.html
split = read.splitlines(True)
if len(stderr_fragment) > 0:
split[0] = "%s%s" % (stderr_fragment, split[0])
split = read.splitlines(True)
if len(stderr_fragment) > 0:
split[0] = "%s%s" % (stderr_fragment, split[0])
- # ignoring the dummy pipe here intentionally - the flag will take care
- # of properly terminating the loop
+ # ignoring the dummy pipe here intentionally - the
+ # flag will take care of properly terminating the loop
"{", "plugin", "dpdk_plugin.so", "{", "disable",
"}", "plugin", "unittest_plugin.so", "{", "enable",
"}", "}", ]
"{", "plugin", "dpdk_plugin.so", "{", "disable",
"}", "plugin", "unittest_plugin.so", "{", "enable",
"}", "}", ]
if plugin_path is not None:
cls.vpp_cmdline.extend(["plugin_path", plugin_path])
cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
if plugin_path is not None:
cls.vpp_cmdline.extend(["plugin_path", plugin_path])
cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
if hasattr(cls, 'pump_thread_stop_flag'):
cls.pump_thread_stop_flag.set()
if hasattr(cls, 'pump_thread_wakeup_pipe'):
if hasattr(cls, 'pump_thread_stop_flag'):
cls.pump_thread_stop_flag.set()
if hasattr(cls, 'pump_thread_wakeup_pipe'):
if hasattr(cls, 'pump_thread'):
cls.logger.debug("Waiting for pump thread to stop")
cls.pump_thread.join()
if hasattr(cls, 'pump_thread'):
cls.logger.debug("Waiting for pump thread to stop")
cls.pump_thread.join()
stderr_log(single_line_delim)
stderr_log('VPP output to stderr while running %s:', cls.__name__)
stderr_log(single_line_delim)
stderr_log(single_line_delim)
stderr_log('VPP output to stderr while running %s:', cls.__name__)
stderr_log(single_line_delim)
with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
f.write(vpp_output)
stderr_log('\n%s', vpp_output)
with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
f.write(vpp_output)
stderr_log('\n%s', vpp_output)
- 0 == getattr(layer, cf) and \
- layer.name in udp_layers:
+ 0 == getattr(layer, cf) and \
+ layer.name in udp_layers:
def assert_packet_counter_equal(self, counter, expected_value):
counters = self.vapi.cli("sh errors").split('\n')
counter_value = -1
def assert_packet_counter_equal(self, counter, expected_value):
counters = self.vapi.cli("sh errors").split('\n')
counter_value = -1
results = counters[i].split()
if results[1] == counter:
counter_value = int(results[0])
results = counters[i].split()
if results[1] == counter:
counter_value = int(results[0])
@classmethod
def sleep(cls, timeout, remark=None):
if hasattr(cls, 'logger'):
@classmethod
def sleep(cls, timeout, remark=None):
if hasattr(cls, 'logger'):
- "slept for %ss instead of ~%ss!" % (
- after - before, timeout))
+ "slept for %es instead of ~%es!",
+ after - before, timeout)
- "Finished sleep (%s) - slept %ss (wanted %ss)" % (
- remark, after - before, timeout))
+ "Finished sleep (%s) - slept %es (wanted %es)",
+ remark, after - before, timeout)
def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
self.vapi.cli("clear trace")
def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
self.vapi.cli("clear trace")
def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
result_pipe=None, failfast=False, buffer=False,
resultclass=None, print_summary=True):
def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
result_pipe=None, failfast=False, buffer=False,
resultclass=None, print_summary=True):
# ignore stream setting here, use hard-coded stdout to be in sync
# with prints from VppTestCase methods ...
super(VppTestRunner, self).__init__(sys.stdout, descriptions,
# ignore stream setting here, use hard-coded stdout to be in sync
# with prints from VppTestCase methods ...
super(VppTestRunner, self).__init__(sys.stdout, descriptions,