#!/usr/bin/env python
from __future__ import print_function
-
-import copy
import gc
+import sys
import os
-import random
import select
-import six
-import sys
+import unittest
import tempfile
import time
-import unittest
+import faulthandler
+import random
+import copy
+import psutil
from collections import deque
-from inspect import getdoc, isclass
-from logging import FileHandler, DEBUG, Formatter
from threading import Thread, Event
+from inspect import getdoc, isclass
from traceback import format_exception
-
-import faulthandler
-import psutil
-from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
-from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
-from scapy.layers.inet6 import ICMPv6EchoReply
+from logging import FileHandler, DEBUG, Formatter
from scapy.packet import Raw
-from vpp_papi.vpp_stats import VPPStats
-
from hook import StepHook, PollHook, VppDiedError
+from vpp_pg_interface import VppPGInterface
+from vpp_sub_interface import VppSubInterface
+from vpp_lo_interface import VppLoInterface
+from vpp_papi_provider import VppPapiProvider
+from vpp_papi.vpp_stats import VPPStats
from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
get_logger, colorize
-from util import ppp, is_core_present
-from vpp_lo_interface import VppLoInterface
from vpp_object import VppObjectRegistry
-from vpp_papi_provider import VppPapiProvider
-from vpp_pg_interface import VppPGInterface
-from vpp_sub_interface import VppSubInterface
-
+from util import ppp, is_core_present
+from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
+from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
+from scapy.layers.inet6 import ICMPv6EchoReply
if os.name == 'posix' and sys.version_info[0] < 3:
# using subprocess32 is recommended by python official documentation
# @ https://docs.python.org/2/library/subprocess.html
c = os.getenv("CACHE_OUTPUT", "1")
cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
cls.set_debug_flags(d)
- cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
- cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
+ cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
+ cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
plugin_path = None
if cls.plugin_path is not None:
print("Now is the time to attach a gdb by running the above "
"command and set up breakpoints etc.")
print(single_line_delim)
- six.input("Press ENTER to continue running the testcase...")
+ raw_input("Press ENTER to continue running the testcase...")
@classmethod
def run_vpp(cls):
print(double_line_delim)
print("VPP or GDB server is still running")
print(single_line_delim)
- six.input("When done debugging, press ENTER to kill the "
+ raw_input("When done debugging, press ENTER to kill the "
"process and finish running the testcase...")
# first signal that we want to stop the pump thread, then wake it up
core_crash_test_cases_info = set()
current_test_case_info = None
- def __init__(self, stream, descriptions, verbosity):
+ def __init__(self, stream, descriptions, verbosity, runner):
"""
:param stream File descriptor to store where to report test results.
Set to the standard error stream by default.
self.descriptions = descriptions
self.verbosity = verbosity
self.result_string = None
+ self.runner = runner
def addSuccess(self, test):
"""
def symlink_failed(self):
if self.current_test_case_info:
try:
- failed_dir = os.getenv('VPP_TEST_FAILED_DIR')
+ failed_dir = os.getenv('FAILED_DIR')
link_path = os.path.join(
failed_dir,
'%s-FAILED' %
"""
Print errors from running the test case
"""
- self.stream.writeln()
- self.printErrorList('ERROR', self.errors)
- self.printErrorList('FAIL', self.failures)
+ if len(self.errors) > 0 or len(self.failures) > 0:
+ self.stream.writeln()
+ self.printErrorList('ERROR', self.errors)
+ self.printErrorList('FAIL', self.failures)
+
+ # ^^ that is the last output from unittest before summary
+ if not self.runner.print_summary:
+ devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
+ self.stream = devnull
+ self.runner.stream = devnull
def printErrorList(self, flavour, errors):
"""
def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
result_pipe=None, failfast=False, buffer=False,
- resultclass=None):
+ resultclass=None, print_summary=True):
# ignore stream setting here, use hard-coded stdout to be in sync
# with prints from VppTestCase methods ...
resultclass)
KeepAliveReporter.pipe = keep_alive_pipe
- VppTestResult.test_framework_result_pipe = result_pipe
+ self.orig_stream = self.stream
+ self.resultclass.test_framework_result_pipe = result_pipe
+
+ self.print_summary = print_summary
+
+ def _makeResult(self):
+ return self.resultclass(self.stream,
+ self.descriptions,
+ self.verbosity,
+ self)
def run(self, test):
"""
faulthandler.enable() # emit stack trace to stderr if killed by signal
result = super(VppTestRunner, self).run(test)
+ if not self.print_summary:
+ self.stream = self.orig_stream
+ result.stream = self.orig_stream
return result