#!/usr/bin/env python
-from abc import *
-import os
-import sys
import subprocess
import unittest
import tempfile
+import time
import resource
from time import sleep
+from collections import deque
+from threading import Thread
from inspect import getdoc
-from hook import PollHook
+from hook import StepHook, PollHook
from vpp_pg_interface import VppPGInterface
+from vpp_lo_interface import VppLoInterface
from vpp_papi_provider import VppPapiProvider
-
from scapy.packet import Raw
-
-from logging import *
+from log import *
"""
Test framework module.
representing the results.
"""
-handler = StreamHandler(sys.stdout)
-getLogger().addHandler(handler)
-try:
- verbose = int(os.getenv("V", 0))
-except:
- verbose = 0
-# 40 = ERROR, 30 = WARNING, 20 = INFO, 10 = DEBUG, 0 = NOTSET (all messages)
-getLogger().setLevel(40 - 10 * verbose)
-getLogger("scapy.runtime").addHandler(handler)
-getLogger("scapy.runtime").setLevel(ERROR)
-
-# Static variables to store color formatting strings.
-#
-# These variables (RED, GREEN, YELLOW and LPURPLE) are used to configure
-# the color of the text to be printed in the terminal. Variable COLOR_RESET
-# is used to revert the text color to the default one.
-if hasattr(sys.stdout, 'isatty') and sys.stdout.isatty():
- RED = '\033[91m'
- GREEN = '\033[92m'
- YELLOW = '\033[93m'
- LPURPLE = '\033[94m'
- COLOR_RESET = '\033[0m'
-else:
- RED = ''
- GREEN = ''
- YELLOW = ''
- LPURPLE = ''
- COLOR_RESET = ''
-
-
-""" @var formatting delimiter consisting of '=' characters """
-double_line_delim = '=' * 70
-""" @var formatting delimiter consisting of '-' characters """
-single_line_delim = '-' * 70
-
class _PacketInfo(object):
"""Private class to create packet info object.
Help process information about the next packet.
Set variables to default values.
- @property index
- Integer variable to store the index of the packet.
- @property src
- Integer variable to store the index of the source packet generator
- interface of the packet.
- @property dst
- Integer variable to store the index of the destination packet generator
- interface of the packet.
- @property data
- Object variable to store the copy of the former packet.
-
-
"""
+ #: Store the index of the packet.
index = -1
+ #: Store the index of the source packet generator interface of the packet.
src = -1
+ #: Store the index of the destination packet generator interface
+ #: of the packet.
dst = -1
+ #: Store the copy of the former packet.
data = None
+ def __eq__(self, other):
+ index = self.index == other.index
+ src = self.src == other.src
+ dst = self.dst == other.dst
+ data = self.data == other.data
+ return index and src and dst and data
-class VppTestCase(unittest.TestCase):
- """
- Subclass of the python unittest.TestCase class.
- This subclass is a base class for test cases that are implemented as classes
- It provides methods to create and run test case.
+def pump_output(out, deque):
+ for line in iter(out.readline, b''):
+ deque.append(line)
+
+class VppTestCase(unittest.TestCase):
+ """This subclass is a base class for VPP test cases that are implemented as
+ classes. It provides methods to create and run test case.
"""
@property
"""Return the instance of this testcase"""
return cls.test_instance
+ @classmethod
+ def set_debug_flags(cls, d):
+ cls.debug_core = False
+ cls.debug_gdb = False
+ cls.debug_gdbserver = False
+ if d is None:
+ return
+ dl = d.lower()
+ if dl == "core":
+ if resource.getrlimit(resource.RLIMIT_CORE)[0] <= 0:
+ # give a heads up if this is actually useless
+ cls.logger.critical("WARNING: core size limit is set 0, core "
+ "files will NOT be created")
+ cls.debug_core = True
+ elif dl == "gdb":
+ cls.debug_gdb = True
+ elif dl == "gdbserver":
+ cls.debug_gdbserver = True
+ else:
+ raise Exception("Unrecognized DEBUG option: '%s'" % d)
+
@classmethod
def setUpConstants(cls):
""" Set-up the test case class based on environment variables """
try:
- cls.interactive = True if int(os.getenv("I")) > 0 else False
+ s = os.getenv("STEP")
+ cls.step = True if s.lower() in ("y", "yes", "1") else False
except:
- cls.interactive = False
- if cls.interactive and resource.getrlimit(resource.RLIMIT_CORE)[0] <= 0:
- # give a heads up if this is actually useless
- critical("WARNING: core size limit is set 0, core files will NOT "
- "be created")
+ cls.step = False
+ try:
+ d = os.getenv("DEBUG")
+ except:
+ d = None
+ cls.set_debug_flags(d)
cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
- cls.vpp_cmdline = [cls.vpp_bin, "unix", "nodaemon",
+ debug_cli = ""
+ if cls.step or cls.debug_gdb or cls.debug_gdbserver:
+ debug_cli = "cli-listen localhost:5002"
+ cls.vpp_cmdline = [cls.vpp_bin, "unix", "{", "nodaemon", debug_cli, "}",
"api-segment", "{", "prefix", cls.shm_prefix, "}"]
if cls.plugin_path is not None:
cls.vpp_cmdline.extend(["plugin_path", cls.plugin_path])
- info("vpp_cmdline: %s" % cls.vpp_cmdline)
+ cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
+
+ @classmethod
+ def wait_for_enter(cls):
+ if cls.debug_gdbserver:
+ print(double_line_delim)
+ print("Spawned GDB server with PID: %d" % cls.vpp.pid)
+ elif cls.debug_gdb:
+ print(double_line_delim)
+ print("Spawned VPP with PID: %d" % cls.vpp.pid)
+ else:
+ cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
+ return
+ print(single_line_delim)
+ print("You can debug the VPP using e.g.:")
+ if cls.debug_gdbserver:
+ print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
+ print("Now is the time to attach a gdb by running the above "
+ "command, set up breakpoints etc. and then resume VPP from "
+ "within gdb by issuing the 'continue' command")
+ elif cls.debug_gdb:
+ print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
+ print("Now is the time to attach a gdb by running the above "
+ "command and set up breakpoints etc.")
+ print(single_line_delim)
+ raw_input("Press ENTER to continue running the testcase...")
+
+ @classmethod
+ def run_vpp(cls):
+ cmdline = cls.vpp_cmdline
+
+ if cls.debug_gdbserver:
+ gdbserver = '/usr/bin/gdbserver'
+ if not os.path.isfile(gdbserver) or \
+ not os.access(gdbserver, os.X_OK):
+ raise Exception("gdbserver binary '%s' does not exist or is "
+ "not executable" % gdbserver)
+
+ cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
+ cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
+
+ try:
+ cls.vpp = subprocess.Popen(cmdline,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ bufsize=1)
+ except Exception as e:
+ cls.logger.critical("Couldn't start vpp: %s" % e)
+ raise
+
+ cls.wait_for_enter()
@classmethod
def setUpClass(cls):
Perform class setup before running the testcase
Remove shared memory files, start vpp and connect the vpp-api
"""
+ cls.logger = getLogger(cls.__name__)
cls.tempdir = tempfile.mkdtemp(
prefix='vpp-unittest-' + cls.__name__ + '-')
cls.shm_prefix = cls.tempdir.split("/")[-1]
os.chdir(cls.tempdir)
- info("Temporary dir is %s, shm prefix is %s",
- cls.tempdir, cls.shm_prefix)
+ cls.logger.info("Temporary dir is %s, shm prefix is %s",
+ cls.tempdir, cls.shm_prefix)
cls.setUpConstants()
cls.pg_streams = []
cls.packet_infos = {}
cls.verbose = 0
+ cls.vpp_dead = False
print(double_line_delim)
- print(YELLOW + getdoc(cls) + COLOR_RESET)
+ print(colorize(getdoc(cls).splitlines()[0], YELLOW))
print(double_line_delim)
# need to catch exceptions here because if we raise, then the cleanup
# doesn't get called and we might end with a zombie vpp
try:
- cls.vpp = subprocess.Popen(cls.vpp_cmdline, stderr=subprocess.PIPE)
- debug("Spawned VPP with PID: %d" % cls.vpp.pid)
- cls.vpp_dead = False
- cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix)
- cls.vapi.register_hook(PollHook(cls))
- cls.vapi.connect()
+ cls.run_vpp()
+ cls.vpp_stdout_deque = deque()
+ cls.vpp_stdout_reader_thread = Thread(target=pump_output, args=(
+ cls.vpp.stdout, cls.vpp_stdout_deque))
+ cls.vpp_stdout_reader_thread.start()
+ cls.vpp_stderr_deque = deque()
+ cls.vpp_stderr_reader_thread = Thread(target=pump_output, args=(
+ cls.vpp.stderr, cls.vpp_stderr_deque))
+ cls.vpp_stderr_reader_thread.start()
+ cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls)
+ if cls.step:
+ hook = StepHook(cls)
+ else:
+ hook = PollHook(cls)
+ cls.vapi.register_hook(hook)
+ time.sleep(0.1)
+ hook.poll_vpp()
+ try:
+ cls.vapi.connect()
+ except:
+ if cls.debug_gdbserver:
+ print(colorize("You're running VPP inside gdbserver but "
+ "VPP-API connection failed, did you forget "
+ "to 'continue' VPP from within gdb?", RED))
+ raise
except:
- cls.vpp.terminate()
- del cls.vpp
+ t, v, tb = sys.exc_info()
+ try:
+ cls.quit()
+ except:
+ pass
+ raise t, v, tb
@classmethod
def quit(cls):
"""
Disconnect vpp-api, kill vpp and cleanup shared memory files
"""
+ if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
+ cls.vpp.poll()
+ if cls.vpp.returncode is None:
+ print(double_line_delim)
+ print("VPP or GDB server is still running")
+ print(single_line_delim)
+ raw_input("When done debugging, press ENTER to kill the process"
+ " and finish running the testcase...")
+
if hasattr(cls, 'vpp'):
- cls.vapi.disconnect()
+ if hasattr(cls, 'vapi'):
+ cls.vapi.disconnect()
cls.vpp.poll()
if cls.vpp.returncode is None:
cls.vpp.terminate()
del cls.vpp
+ if hasattr(cls, 'vpp_stdout_deque'):
+ cls.logger.info(single_line_delim)
+ cls.logger.info('VPP output to stdout while running %s:',
+ cls.__name__)
+ cls.logger.info(single_line_delim)
+ f = open(cls.tempdir + '/vpp_stdout.txt', 'w')
+ vpp_output = "".join(cls.vpp_stdout_deque)
+ f.write(vpp_output)
+ cls.logger.info('\n%s', vpp_output)
+ cls.logger.info(single_line_delim)
+
+ if hasattr(cls, 'vpp_stderr_deque'):
+ cls.logger.info(single_line_delim)
+ cls.logger.info('VPP output to stderr while running %s:',
+ cls.__name__)
+ cls.logger.info(single_line_delim)
+ f = open(cls.tempdir + '/vpp_stderr.txt', 'w')
+ vpp_output = "".join(cls.vpp_stderr_deque)
+ f.write(vpp_output)
+ cls.logger.info('\n%s', vpp_output)
+ cls.logger.info(single_line_delim)
+
@classmethod
def tearDownClass(cls):
""" Perform final cleanup after running all tests in this test-case """
def tearDown(self):
""" Show various debug prints after each test """
if not self.vpp_dead:
- info(self.vapi.cli("show int"))
- info(self.vapi.cli("show trace"))
- info(self.vapi.cli("show hardware"))
- info(self.vapi.cli("show error"))
- info(self.vapi.cli("show run"))
+ self.logger.debug(self.vapi.cli("show trace"))
+ self.logger.info(self.vapi.ppcli("show int"))
+ self.logger.info(self.vapi.ppcli("show hardware"))
+ self.logger.info(self.vapi.ppcli("show error"))
+ self.logger.info(self.vapi.ppcli("show run"))
def setUp(self):
""" Clear trace before running each test"""
+ if self.vpp_dead:
+ raise Exception("VPP is dead when setting up the test")
+ time.sleep(.1)
+ self.vpp_stdout_deque.append(
+ "--- test setUp() for %s.%s(%s) starts here ---\n" %
+ (self.__class__.__name__, self._testMethodName,
+ self._testMethodDoc))
+ self.vpp_stderr_deque.append(
+ "--- test setUp() for %s.%s(%s) starts here ---\n" %
+ (self.__class__.__name__, self._testMethodName,
+ self._testMethodDoc))
self.vapi.cli("clear trace")
# store the test instance inside the test class - so that objects
# holding the class can access instance methods (like assertEqual)
i.enable_capture()
@classmethod
- def pg_start(cls):
+ def pg_start(cls, sleep_time=1):
"""
Enable the packet-generator and send all prepared packet streams
Remove the packet streams afterwards
"""
cls.vapi.cli("trace add pg-input 50") # 50 is maximum
cls.vapi.cli('packet-generator enable')
- sleep(1) # give VPP some time to process the packets
+ sleep(sleep_time) # give VPP some time to process the packets
for stream in cls.pg_streams:
cls.vapi.cli('packet-generator delete %s' % stream)
cls.pg_streams = []
cls.pg_interfaces = result
return result
+ @classmethod
+ def create_loopback_interfaces(cls, interfaces):
+ """
+ Create loopback interfaces
+
+ :param interfaces: iterable indexes of the interfaces
+
+ """
+ result = []
+ for i in interfaces:
+ intf = VppLoInterface(cls, i)
+ setattr(cls, intf.name, intf)
+ result.append(intf)
+ cls.lo_interfaces = result
+ return result
+
@staticmethod
def extend_packet(packet, size):
"""
if info.dst == dst_index:
return info
+ def assert_equal(self, real_value, expected_value, name_or_class=None):
+ if name_or_class is None:
+ self.assertEqual(real_value, expected_value, msg)
+ return
+ try:
+ msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
+ msg = msg % (getdoc(name_or_class).strip(),
+ real_value, str(name_or_class(real_value)),
+ expected_value, str(name_or_class(expected_value)))
+ except:
+ msg = "Invalid %s: %s does not match expected value %s" % (
+ name_or_class, real_value, expected_value)
+
+ self.assertEqual(real_value, expected_value, msg)
+
+ def assert_in_range(
+ self,
+ real_value,
+ expected_min,
+ expected_max,
+ name=None):
+ if name is None:
+ msg = None
+ else:
+ msg = "Invalid %s: %s out of range <%s,%s>" % (
+ name, real_value, expected_min, expected_max)
+ self.assertTrue(expected_min <= real_value <= expected_max, msg)
+
class VppTestResult(unittest.TestResult):
"""
"""
unittest.TestResult.addSuccess(self, test)
- self.result_string = GREEN + "OK" + COLOR_RESET
+ self.result_string = colorize("OK", GREEN)
def addSkip(self, test, reason):
"""
"""
unittest.TestResult.addSkip(self, test, reason)
- self.result_string = YELLOW + "SKIP" + COLOR_RESET
+ self.result_string = colorize("SKIP", YELLOW)
def addFailure(self, test, err):
"""
"""
unittest.TestResult.addFailure(self, test, err)
if hasattr(test, 'tempdir'):
- self.result_string = RED + "FAIL" + COLOR_RESET + \
+ self.result_string = colorize("FAIL", RED) + \
' [ temp dir used by test case: ' + test.tempdir + ' ]'
else:
- self.result_string = RED + "FAIL" + COLOR_RESET + ' [no temp dir]'
+ self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
def addError(self, test, err):
"""
"""
unittest.TestResult.addError(self, test, err)
if hasattr(test, 'tempdir'):
- self.result_string = RED + "ERROR" + COLOR_RESET + \
+ self.result_string = colorize("ERROR", RED) + \
' [ temp dir used by test case: ' + test.tempdir + ' ]'
else:
- self.result_string = RED + "ERROR" + COLOR_RESET + ' [no temp dir]'
+ self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
def getDescription(self, test):
"""