from vpp_papi_provider import VppPapiProvider
from vpp_papi.vpp_stats import VPPStats
from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
- getLogger, colorize
+ get_logger, colorize
from vpp_object import VppObjectRegistry
from util import ppp, is_core_present
from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
def __init__(self):
self.__dict__ = self._shared_state
+ self._pipe = None
@property
def pipe(self):
@pipe.setter
def pipe(self, pipe):
- if hasattr(self, '_pipe'):
+ if self._pipe is not None:
raise Exception("Internal error - pipe should only be set once.")
self._pipe = pipe
"}", "}", ]
if plugin_path is not None:
cls.vpp_cmdline.extend(["plugin_path", plugin_path])
- cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
+ cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
+ cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
@classmethod
def wait_for_enter(cls):
@classmethod
def wait_for_stats_socket(cls):
deadline = time.time() + 3
- while time.time() < deadline or cls.debug_gdb or cls.debug_gdbserver:
+ ok = False
+ while time.time() < deadline or \
+ cls.debug_gdb or cls.debug_gdbserver:
if os.path.exists(cls.stats_sock):
+ ok = True
break
+ time.sleep(0.8)
+ if not ok:
+ cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
@classmethod
def setUpClass(cls):
gc.collect() # run garbage collection first
random.seed()
cls.print_header(cls)
- if not hasattr(cls, 'logger'):
- cls.logger = getLogger(cls.__name__)
- else:
- cls.logger.name = cls.__name__
+ cls.logger = get_logger(cls.__name__)
+ if hasattr(cls, 'parallel_handler'):
+ cls.logger.addHandler(cls.parallel_handler)
+ cls.logger.propagate = False
cls.tempdir = tempfile.mkdtemp(
prefix='vpp-unittest-%s-' % cls.__name__)
cls.stats_sock = "%s/stats.sock" % cls.tempdir
if pkt.haslayer(ICMPv6EchoReply):
self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
+ def assert_packet_counter_equal(self, counter, expected_value):
+ counters = self.vapi.cli("sh errors").split('\n')
+ counter_value = -1
+ for i in range(1, len(counters)-1):
+ results = counters[i].split()
+ if results[1] == counter:
+ counter_value = int(results[0])
+ break
+ self.assert_equal(counter_value, expected_value,
+ "packet counter `%s'" % counter)
+
@classmethod
def sleep(cls, timeout, remark=None):
if hasattr(cls, 'logger'):
input.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- rx = output.get_capture(len(pkts))
+ if isinstance(object, (list,)):
+ rx = []
+ for o in output:
+ rx.append(output.get_capture(len(pkts)))
+ else:
+ rx = output.get_capture(len(pkts))
+ return rx
+
+ def send_and_expect_only(self, input, pkts, output, timeout=None):
+ self.vapi.cli("clear trace")
+ input.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ if isinstance(object, (list,)):
+ outputs = output
+ rx = []
+ for o in outputs:
+ rx.append(output.get_capture(len(pkts)))
+ else:
+ rx = output.get_capture(len(pkts))
+ outputs = [output]
+ if not timeout:
+ timeout = 1
+ for i in self.pg_interfaces:
+ if i not in outputs:
+ i.get_capture(0, timeout=timeout)
+ i.assert_nothing_captured()
+ timeout = 0.1
+
return rx
super(VppTestRunner, self).__init__(sys.stdout, descriptions,
verbosity, failfast, buffer,
resultclass)
- reporter = KeepAliveReporter()
- reporter.pipe = keep_alive_pipe
+ KeepAliveReporter.pipe = keep_alive_pipe
VppTestResult.test_framework_result_pipe = result_pipe