#!/usr/bin/env python
from __future__ import print_function
+
+import copy
import gc
-import sys
import os
+import random
import select
-import unittest
+import six
+import sys
import tempfile
import time
-import faulthandler
-import random
-import copy
-import psutil
+import unittest
from collections import deque
-from threading import Thread, Event
from inspect import getdoc, isclass
-from traceback import format_exception
from logging import FileHandler, DEBUG, Formatter
+from threading import Thread, Event
+from traceback import format_exception
+
+import faulthandler
+import psutil
+from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
+from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
+from scapy.layers.inet6 import ICMPv6EchoReply
from scapy.packet import Raw
-from hook import StepHook, PollHook, VppDiedError
-from vpp_pg_interface import VppPGInterface
-from vpp_sub_interface import VppSubInterface
-from vpp_lo_interface import VppLoInterface
-from vpp_papi_provider import VppPapiProvider
from vpp_papi.vpp_stats import VPPStats
+
+from hook import StepHook, PollHook, VppDiedError
from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
- getLogger, colorize
-from vpp_object import VppObjectRegistry
+ get_logger, colorize
from util import ppp, is_core_present
-from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
-from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
-from scapy.layers.inet6 import ICMPv6EchoReply
+from vpp_lo_interface import VppLoInterface
+from vpp_object import VppObjectRegistry
+from vpp_papi_provider import VppPapiProvider
+from vpp_pg_interface import VppPGInterface
+from vpp_sub_interface import VppSubInterface
+
if os.name == 'posix' and sys.version_info[0] < 3:
# using subprocess32 is recommended by python official documentation
# @ https://docs.python.org/2/library/subprocess.html
def __init__(self):
self.__dict__ = self._shared_state
+ self._pipe = None
@property
def pipe(self):
@pipe.setter
def pipe(self, pipe):
- if hasattr(self, '_pipe'):
+ if self._pipe is not None:
raise Exception("Internal error - pipe should only be set once.")
self._pipe = pipe
"}", "}", ]
if plugin_path is not None:
cls.vpp_cmdline.extend(["plugin_path", plugin_path])
- cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
+ cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
+ cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
@classmethod
def wait_for_enter(cls):
print("Now is the time to attach a gdb by running the above "
"command and set up breakpoints etc.")
print(single_line_delim)
- raw_input("Press ENTER to continue running the testcase...")
+ six.input("Press ENTER to continue running the testcase...")
@classmethod
def run_vpp(cls):
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=1)
- except Exception as e:
+ except subprocess.CalledProcessError as e:
cls.logger.critical("Couldn't start vpp: %s" % e)
raise
@classmethod
def wait_for_stats_socket(cls):
deadline = time.time() + 3
- while time.time() < deadline or cls.debug_gdb or cls.debug_gdbserver:
+ ok = False
+ while time.time() < deadline or \
+ cls.debug_gdb or cls.debug_gdbserver:
if os.path.exists(cls.stats_sock):
+ ok = True
break
+ time.sleep(0.8)
+ if not ok:
+ cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
@classmethod
def setUpClass(cls):
gc.collect() # run garbage collection first
random.seed()
cls.print_header(cls)
- if not hasattr(cls, 'logger'):
- cls.logger = getLogger(cls.__name__)
- else:
- cls.logger.name = cls.__name__
+ cls.logger = get_logger(cls.__name__)
+ if hasattr(cls, 'parallel_handler'):
+ cls.logger.addHandler(cls.parallel_handler)
+ cls.logger.propagate = False
cls.tempdir = tempfile.mkdtemp(
prefix='vpp-unittest-%s-' % cls.__name__)
cls.stats_sock = "%s/stats.sock" % cls.tempdir
print(double_line_delim)
print("VPP or GDB server is still running")
print(single_line_delim)
- raw_input("When done debugging, press ENTER to kill the "
+ six.input("When done debugging, press ENTER to kill the "
"process and finish running the testcase...")
# first signal that we want to stop the pump thread, then wake it up
if pkt.haslayer(ICMPv6EchoReply):
self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
+ def assert_packet_counter_equal(self, counter, expected_value):
+ counters = self.vapi.cli("sh errors").split('\n')
+ counter_value = -1
+ for i in range(1, len(counters)-1):
+ results = counters[i].split()
+ if results[1] == counter:
+ counter_value = int(results[0])
+ break
+ self.assert_equal(counter_value, expected_value,
+ "packet counter `%s'" % counter)
+
@classmethod
def sleep(cls, timeout, remark=None):
if hasattr(cls, 'logger'):
super(VppTestRunner, self).__init__(sys.stdout, descriptions,
verbosity, failfast, buffer,
resultclass)
- reporter = KeepAliveReporter()
- reporter.pipe = keep_alive_pipe
+ KeepAliveReporter.pipe = keep_alive_pipe
VppTestResult.test_framework_result_pipe = result_pipe