import time
import resource
from time import sleep
-from Queue import Queue
+from collections import deque
from threading import Thread
from inspect import getdoc
from hook import StepHook, PollHook
Help process information about the next packet.
Set variables to default values.
- @property index
- Integer variable to store the index of the packet.
- @property src
- Integer variable to store the index of the source packet generator
- interface of the packet.
- @property dst
- Integer variable to store the index of the destination packet generator
- interface of the packet.
- @property data
- Object variable to store the copy of the former packet.
-
-
"""
+ #: Store the index of the packet.
index = -1
+ #: Store the index of the source packet generator interface of the packet.
src = -1
+ #: Store the index of the destination packet generator interface
+ #: of the packet.
dst = -1
+ #: Store the copy of the former packet.
data = None
+ def __eq__(self, other):
+ index = self.index == other.index
+ src = self.src == other.src
+ dst = self.dst == other.dst
+ data = self.data == other.data
+ return index and src and dst and data
+
-def pump_output(out, queue):
+def pump_output(out, deque):
for line in iter(out.readline, b''):
- queue.put(line)
+ deque.append(line)
class VppTestCase(unittest.TestCase):
- """
- Subclass of the python unittest.TestCase class.
-
- This subclass is a base class for test cases that are implemented as classes
- It provides methods to create and run test case.
-
+ """This subclass is a base class for VPP test cases that are implemented as
+ classes. It provides methods to create and run test case.
"""
@property
cls.pg_streams = []
cls.packet_infos = {}
cls.verbose = 0
+ cls.vpp_dead = False
print(double_line_delim)
- print(colorize(getdoc(cls), YELLOW))
+ print(colorize(getdoc(cls).splitlines()[0], YELLOW))
print(double_line_delim)
# need to catch exceptions here because if we raise, then the cleanup
# doesn't get called and we might end with a zombie vpp
try:
cls.run_vpp()
- cls.vpp_dead = False
- cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix)
+ cls.vpp_stdout_deque = deque()
+ cls.vpp_stdout_reader_thread = Thread(target=pump_output, args=(
+ cls.vpp.stdout, cls.vpp_stdout_deque))
+ cls.vpp_stdout_reader_thread.start()
+ cls.vpp_stderr_deque = deque()
+ cls.vpp_stderr_reader_thread = Thread(target=pump_output, args=(
+ cls.vpp.stderr, cls.vpp_stderr_deque))
+ cls.vpp_stderr_reader_thread.start()
+ cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls)
if cls.step:
- cls.vapi.register_hook(StepHook(cls))
+ hook = StepHook(cls)
else:
- cls.vapi.register_hook(PollHook(cls))
+ hook = PollHook(cls)
+ cls.vapi.register_hook(hook)
time.sleep(0.1)
+ hook.poll_vpp()
try:
cls.vapi.connect()
except:
"VPP-API connection failed, did you forget "
"to 'continue' VPP from within gdb?", RED))
raise
- cls.vpp_stdout_queue = Queue()
- cls.vpp_stdout_reader_thread = Thread(
- target=pump_output, args=(cls.vpp.stdout, cls.vpp_stdout_queue))
- cls.vpp_stdout_reader_thread.start()
- cls.vpp_stderr_queue = Queue()
- cls.vpp_stderr_reader_thread = Thread(
- target=pump_output, args=(cls.vpp.stderr, cls.vpp_stderr_queue))
- cls.vpp_stderr_reader_thread.start()
except:
- if hasattr(cls, 'vpp'):
- cls.vpp.terminate()
- del cls.vpp
- raise
+ t, v, tb = sys.exc_info()
+ try:
+ cls.quit()
+ except:
+ pass
+ raise t, v, tb
@classmethod
def quit(cls):
" and finish running the testcase...")
if hasattr(cls, 'vpp'):
- cls.vapi.disconnect()
+ if hasattr(cls, 'vapi'):
+ cls.vapi.disconnect()
cls.vpp.poll()
if cls.vpp.returncode is None:
cls.vpp.terminate()
del cls.vpp
- if hasattr(cls, 'vpp_stdout_queue'):
+ if hasattr(cls, 'vpp_stdout_deque'):
cls.logger.info(single_line_delim)
cls.logger.info('VPP output to stdout while running %s:',
cls.__name__)
cls.logger.info(single_line_delim)
f = open(cls.tempdir + '/vpp_stdout.txt', 'w')
- while not cls.vpp_stdout_queue.empty():
- line = cls.vpp_stdout_queue.get_nowait()
- f.write(line)
- cls.logger.info('VPP stdout: %s' % line.rstrip('\n'))
+ vpp_output = "".join(cls.vpp_stdout_deque)
+ f.write(vpp_output)
+ cls.logger.info('\n%s', vpp_output)
+ cls.logger.info(single_line_delim)
- if hasattr(cls, 'vpp_stderr_queue'):
+ if hasattr(cls, 'vpp_stderr_deque'):
cls.logger.info(single_line_delim)
cls.logger.info('VPP output to stderr while running %s:',
cls.__name__)
cls.logger.info(single_line_delim)
f = open(cls.tempdir + '/vpp_stderr.txt', 'w')
- while not cls.vpp_stderr_queue.empty():
- line = cls.vpp_stderr_queue.get_nowait()
- f.write(line)
- cls.logger.info('VPP stderr: %s' % line.rstrip('\n'))
+ vpp_output = "".join(cls.vpp_stderr_deque)
+ f.write(vpp_output)
+ cls.logger.info('\n%s', vpp_output)
cls.logger.info(single_line_delim)
@classmethod
def tearDown(self):
""" Show various debug prints after each test """
if not self.vpp_dead:
- self.logger.info(self.vapi.ppcli("show int"))
self.logger.debug(self.vapi.cli("show trace"))
+ self.logger.info(self.vapi.ppcli("show int"))
self.logger.info(self.vapi.ppcli("show hardware"))
self.logger.info(self.vapi.ppcli("show error"))
self.logger.info(self.vapi.ppcli("show run"))
def setUp(self):
""" Clear trace before running each test"""
+ if self.vpp_dead:
+ raise Exception("VPP is dead when setting up the test")
+ time.sleep(.1)
+ self.vpp_stdout_deque.append(
+ "--- test setUp() for %s.%s(%s) starts here ---\n" %
+ (self.__class__.__name__, self._testMethodName,
+ self._testMethodDoc))
+ self.vpp_stderr_deque.append(
+ "--- test setUp() for %s.%s(%s) starts here ---\n" %
+ (self.__class__.__name__, self._testMethodName,
+ self._testMethodDoc))
self.vapi.cli("clear trace")
# store the test instance inside the test class - so that objects
# holding the class can access instance methods (like assertEqual)
i.enable_capture()
@classmethod
- def pg_start(cls):
+ def pg_start(cls, sleep_time=1):
"""
Enable the packet-generator and send all prepared packet streams
Remove the packet streams afterwards
"""
cls.vapi.cli("trace add pg-input 50") # 50 is maximum
cls.vapi.cli('packet-generator enable')
- sleep(1) # give VPP some time to process the packets
+ sleep(sleep_time) # give VPP some time to process the packets
for stream in cls.pg_streams:
cls.vapi.cli('packet-generator delete %s' % stream)
cls.pg_streams = []
if info.dst == dst_index:
return info
+ def assert_equal(self, real_value, expected_value, name_or_class=None):
+ if name_or_class is None:
+ self.assertEqual(real_value, expected_value, msg)
+ return
+ try:
+ msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
+ msg = msg % (getdoc(name_or_class).strip(),
+ real_value, str(name_or_class(real_value)),
+ expected_value, str(name_or_class(expected_value)))
+ except:
+ msg = "Invalid %s: %s does not match expected value %s" % (
+ name_or_class, real_value, expected_value)
+
+ self.assertEqual(real_value, expected_value, msg)
+
+ def assert_in_range(
+ self,
+ real_value,
+ expected_min,
+ expected_max,
+ name=None):
+ if name is None:
+ msg = None
+ else:
+ msg = "Invalid %s: %s out of range <%s,%s>" % (
+ name, real_value, expected_min, expected_max)
+ self.assertTrue(expected_min <= real_value <= expected_max, msg)
+
class VppTestResult(unittest.TestResult):
"""