8 from collections import deque
9 from threading import Thread
10 from inspect import getdoc
11 from hook import StepHook, PollHook
12 from vpp_pg_interface import VppPGInterface
13 from vpp_lo_interface import VppLoInterface
14 from vpp_papi_provider import VppPapiProvider
15 from scapy.packet import Raw
16 from logging import FileHandler, DEBUG
20 Test framework module.
22 The module provides a set of tools for constructing and running tests and
23 representing the results.
27 class _PacketInfo(object):
28 """Private class to create packet info object.
30 Help process information about the next packet.
31 Set variables to default values.
33 #: Store the index of the packet.
35 #: Store the index of the source packet generator interface of the packet.
37 #: Store the index of the destination packet generator interface
40 #: Store the copy of the former packet.
43 def __eq__(self, other):
44 index = self.index == other.index
45 src = self.src == other.src
46 dst = self.dst == other.dst
47 data = self.data == other.data
48 return index and src and dst and data
51 def pump_output(out, deque):
52 for line in iter(out.readline, b''):
56 class VppTestCase(unittest.TestCase):
57 """This subclass is a base class for VPP test cases that are implemented as
58 classes. It provides methods to create and run test case.
62 def packet_infos(self):
63 """List of packet infos"""
64 return self._packet_infos
67 def packet_infos(self, value):
68 self._packet_infos = value
72 """Return the instance of this testcase"""
73 return cls.test_instance
76 def set_debug_flags(cls, d):
77 cls.debug_core = False
79 cls.debug_gdbserver = False
84 if resource.getrlimit(resource.RLIMIT_CORE)[0] <= 0:
85 # give a heads up if this is actually useless
86 cls.logger.critical("WARNING: core size limit is set 0, core "
87 "files will NOT be created")
91 elif dl == "gdbserver":
92 cls.debug_gdbserver = True
94 raise Exception("Unrecognized DEBUG option: '%s'" % d)
97 def setUpConstants(cls):
98 """ Set-up the test case class based on environment variables """
100 s = os.getenv("STEP")
101 cls.step = True if s.lower() in ("y", "yes", "1") else False
105 d = os.getenv("DEBUG")
108 cls.set_debug_flags(d)
109 cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
110 cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
112 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
113 debug_cli = "cli-listen localhost:5002"
114 cls.vpp_cmdline = [cls.vpp_bin, "unix", "{", "nodaemon", debug_cli, "}",
115 "api-segment", "{", "prefix", cls.shm_prefix, "}"]
116 if cls.plugin_path is not None:
117 cls.vpp_cmdline.extend(["plugin_path", cls.plugin_path])
118 cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
121 def wait_for_enter(cls):
122 if cls.debug_gdbserver:
123 print(double_line_delim)
124 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
126 print(double_line_delim)
127 print("Spawned VPP with PID: %d" % cls.vpp.pid)
129 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
131 print(single_line_delim)
132 print("You can debug the VPP using e.g.:")
133 if cls.debug_gdbserver:
134 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
135 print("Now is the time to attach a gdb by running the above "
136 "command, set up breakpoints etc. and then resume VPP from "
137 "within gdb by issuing the 'continue' command")
139 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
140 print("Now is the time to attach a gdb by running the above "
141 "command and set up breakpoints etc.")
142 print(single_line_delim)
143 raw_input("Press ENTER to continue running the testcase...")
147 cmdline = cls.vpp_cmdline
149 if cls.debug_gdbserver:
150 gdbserver = '/usr/bin/gdbserver'
151 if not os.path.isfile(gdbserver) or \
152 not os.access(gdbserver, os.X_OK):
153 raise Exception("gdbserver binary '%s' does not exist or is "
154 "not executable" % gdbserver)
156 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
157 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
160 cls.vpp = subprocess.Popen(cmdline,
161 stdout=subprocess.PIPE,
162 stderr=subprocess.PIPE,
164 except Exception as e:
165 cls.logger.critical("Couldn't start vpp: %s" % e)
173 Perform class setup before running the testcase
174 Remove shared memory files, start vpp and connect the vpp-api
176 cls.logger = getLogger(cls.__name__)
177 cls.tempdir = tempfile.mkdtemp(
178 prefix='vpp-unittest-' + cls.__name__ + '-')
179 file_handler = FileHandler("%s/log.txt" % cls.tempdir)
180 file_handler.setLevel(DEBUG)
181 cls.logger.addHandler(file_handler)
182 cls.shm_prefix = cls.tempdir.split("/")[-1]
183 os.chdir(cls.tempdir)
184 cls.logger.info("Temporary dir is %s, shm prefix is %s",
185 cls.tempdir, cls.shm_prefix)
188 cls._zombie_captures = []
189 cls.packet_infos = {}
192 print(double_line_delim)
193 print(colorize(getdoc(cls).splitlines()[0], YELLOW))
194 print(double_line_delim)
195 # need to catch exceptions here because if we raise, then the cleanup
196 # doesn't get called and we might end with a zombie vpp
199 cls.vpp_stdout_deque = deque()
200 cls.vpp_stdout_reader_thread = Thread(target=pump_output, args=(
201 cls.vpp.stdout, cls.vpp_stdout_deque))
202 cls.vpp_stdout_reader_thread.start()
203 cls.vpp_stderr_deque = deque()
204 cls.vpp_stderr_reader_thread = Thread(target=pump_output, args=(
205 cls.vpp.stderr, cls.vpp_stderr_deque))
206 cls.vpp_stderr_reader_thread.start()
207 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls)
212 cls.vapi.register_hook(hook)
218 if cls.debug_gdbserver:
219 print(colorize("You're running VPP inside gdbserver but "
220 "VPP-API connection failed, did you forget "
221 "to 'continue' VPP from within gdb?", RED))
224 t, v, tb = sys.exc_info()
234 Disconnect vpp-api, kill vpp and cleanup shared memory files
236 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
238 if cls.vpp.returncode is None:
239 print(double_line_delim)
240 print("VPP or GDB server is still running")
241 print(single_line_delim)
242 raw_input("When done debugging, press ENTER to kill the process"
243 " and finish running the testcase...")
245 if hasattr(cls, 'vpp'):
246 if hasattr(cls, 'vapi'):
247 cls.vapi.disconnect()
249 if cls.vpp.returncode is None:
253 if hasattr(cls, 'vpp_stdout_deque'):
254 cls.logger.info(single_line_delim)
255 cls.logger.info('VPP output to stdout while running %s:',
257 cls.logger.info(single_line_delim)
258 f = open(cls.tempdir + '/vpp_stdout.txt', 'w')
259 vpp_output = "".join(cls.vpp_stdout_deque)
261 cls.logger.info('\n%s', vpp_output)
262 cls.logger.info(single_line_delim)
264 if hasattr(cls, 'vpp_stderr_deque'):
265 cls.logger.info(single_line_delim)
266 cls.logger.info('VPP output to stderr while running %s:',
268 cls.logger.info(single_line_delim)
269 f = open(cls.tempdir + '/vpp_stderr.txt', 'w')
270 vpp_output = "".join(cls.vpp_stderr_deque)
272 cls.logger.info('\n%s', vpp_output)
273 cls.logger.info(single_line_delim)
276 def tearDownClass(cls):
277 """ Perform final cleanup after running all tests in this test-case """
281 """ Show various debug prints after each test """
282 if not self.vpp_dead:
283 self.logger.debug(self.vapi.cli("show trace"))
284 self.logger.info(self.vapi.ppcli("show int"))
285 self.logger.info(self.vapi.ppcli("show hardware"))
286 self.logger.info(self.vapi.ppcli("show error"))
287 self.logger.info(self.vapi.ppcli("show run"))
290 """ Clear trace before running each test"""
292 raise Exception("VPP is dead when setting up the test")
294 self.vpp_stdout_deque.append(
295 "--- test setUp() for %s.%s(%s) starts here ---\n" %
296 (self.__class__.__name__, self._testMethodName,
297 self._testMethodDoc))
298 self.vpp_stderr_deque.append(
299 "--- test setUp() for %s.%s(%s) starts here ---\n" %
300 (self.__class__.__name__, self._testMethodName,
301 self._testMethodDoc))
302 self.vapi.cli("clear trace")
303 # store the test instance inside the test class - so that objects
304 # holding the class can access instance methods (like assertEqual)
305 type(self).test_instance = self
308 def pg_enable_capture(cls, interfaces):
310 Enable capture on packet-generator interfaces
312 :param interfaces: iterable interface indexes
319 def register_capture(cls, cap_name):
320 """ Register a capture in the testclass """
321 # add to the list of captures with current timestamp
322 cls._captures.append((time.time(), cap_name))
323 # filter out from zombies
324 cls._zombie_captures = [(stamp, name)
325 for (stamp, name) in cls._zombie_captures
330 """ Remove any zombie captures and enable the packet generator """
331 # how long before capture is allowed to be deleted - otherwise vpp
332 # crashes - 100ms seems enough (this shouldn't be needed at all)
335 for stamp, cap_name in cls._zombie_captures:
336 wait = stamp + capture_ttl - now
338 cls.logger.debug("Waiting for %ss before deleting capture %s",
342 cls.logger.debug("Removing zombie capture %s" % cap_name)
343 cls.vapi.cli('packet-generator delete %s' % cap_name)
345 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
346 cls.vapi.cli('packet-generator enable')
347 cls._zombie_captures = cls._captures
351 def create_pg_interfaces(cls, interfaces):
353 Create packet-generator interfaces
355 :param interfaces: iterable indexes of the interfaces
360 intf = VppPGInterface(cls, i)
361 setattr(cls, intf.name, intf)
363 cls.pg_interfaces = result
367 def create_loopback_interfaces(cls, interfaces):
369 Create loopback interfaces
371 :param interfaces: iterable indexes of the interfaces
376 intf = VppLoInterface(cls, i)
377 setattr(cls, intf.name, intf)
379 cls.lo_interfaces = result
383 def extend_packet(packet, size):
385 Extend packet to given size by padding with spaces
386 NOTE: Currently works only when Raw layer is present.
388 :param packet: packet
389 :param size: target size
392 packet_len = len(packet) + 4
393 extend = size - packet_len
395 packet[Raw].load += ' ' * extend
397 def add_packet_info_to_list(self, info):
399 Add packet info to the testcase's packet info list
401 :param info: packet info
404 info.index = len(self.packet_infos)
405 self.packet_infos[info.index] = info
407 def create_packet_info(self, src_pg_index, dst_pg_index):
409 Create packet info object containing the source and destination indexes
410 and add it to the testcase's packet info list
412 :param src_pg_index: source packet-generator index
413 :param dst_pg_index: destination packet-generator index
415 :returns: _PacketInfo object
419 self.add_packet_info_to_list(info)
420 info.src = src_pg_index
421 info.dst = dst_pg_index
425 def info_to_payload(info):
427 Convert _PacketInfo object to packet payload
429 :param info: _PacketInfo object
431 :returns: string containing serialized data from packet info
433 return "%d %d %d" % (info.index, info.src, info.dst)
436 def payload_to_info(payload):
438 Convert packet payload to _PacketInfo object
440 :param payload: packet payload
442 :returns: _PacketInfo object containing de-serialized data from payload
445 numbers = payload.split()
447 info.index = int(numbers[0])
448 info.src = int(numbers[1])
449 info.dst = int(numbers[2])
452 def get_next_packet_info(self, info):
454 Iterate over the packet info list stored in the testcase
455 Start iteration with first element if info is None
456 Continue based on index in info if info is specified
458 :param info: info or None
459 :returns: next info in list or None if no more infos
464 next_index = info.index + 1
465 if next_index == len(self.packet_infos):
468 return self.packet_infos[next_index]
470 def get_next_packet_info_for_interface(self, src_index, info):
472 Search the packet info list for the next packet info with same source
475 :param src_index: source interface index to search for
476 :param info: packet info - where to start the search
477 :returns: packet info or None
481 info = self.get_next_packet_info(info)
484 if info.src == src_index:
487 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
489 Search the packet info list for the next packet info with same source
490 and destination interface indexes
492 :param src_index: source interface index to search for
493 :param dst_index: destination interface index to search for
494 :param info: packet info - where to start the search
495 :returns: packet info or None
499 info = self.get_next_packet_info_for_interface(src_index, info)
502 if info.dst == dst_index:
505 def assert_equal(self, real_value, expected_value, name_or_class=None):
506 if name_or_class is None:
507 self.assertEqual(real_value, expected_value, msg)
510 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
511 msg = msg % (getdoc(name_or_class).strip(),
512 real_value, str(name_or_class(real_value)),
513 expected_value, str(name_or_class(expected_value)))
515 msg = "Invalid %s: %s does not match expected value %s" % (
516 name_or_class, real_value, expected_value)
518 self.assertEqual(real_value, expected_value, msg)
529 msg = "Invalid %s: %s out of range <%s,%s>" % (
530 name, real_value, expected_min, expected_max)
531 self.assertTrue(expected_min <= real_value <= expected_max, msg)
534 class VppTestResult(unittest.TestResult):
536 @property result_string
537 String variable to store the test case result string.
539 List variable containing 2-tuples of TestCase instances and strings
540 holding formatted tracebacks. Each tuple represents a test which
541 raised an unexpected exception.
543 List variable containing 2-tuples of TestCase instances and strings
544 holding formatted tracebacks. Each tuple represents a test where
545 a failure was explicitly signalled using the TestCase.assert*()
549 def __init__(self, stream, descriptions, verbosity):
551 :param stream File descriptor to store where to report test results. Set
552 to the standard error stream by default.
553 :param descriptions Boolean variable to store information if to use test
555 :param verbosity Integer variable to store required verbosity level.
557 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
559 self.descriptions = descriptions
560 self.verbosity = verbosity
561 self.result_string = None
563 def addSuccess(self, test):
565 Record a test succeeded result
570 unittest.TestResult.addSuccess(self, test)
571 self.result_string = colorize("OK", GREEN)
573 def addSkip(self, test, reason):
575 Record a test skipped.
581 unittest.TestResult.addSkip(self, test, reason)
582 self.result_string = colorize("SKIP", YELLOW)
584 def addFailure(self, test, err):
586 Record a test failed result
589 :param err: error message
592 unittest.TestResult.addFailure(self, test, err)
593 if hasattr(test, 'tempdir'):
594 self.result_string = colorize("FAIL", RED) + \
595 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
597 self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
599 def addError(self, test, err):
601 Record a test error result
604 :param err: error message
607 unittest.TestResult.addError(self, test, err)
608 if hasattr(test, 'tempdir'):
609 self.result_string = colorize("ERROR", RED) + \
610 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
612 self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
614 def getDescription(self, test):
619 :returns: test description
622 # TODO: if none print warning not raise exception
623 short_description = test.shortDescription()
624 if self.descriptions and short_description:
625 return short_description
629 def startTest(self, test):
636 unittest.TestResult.startTest(self, test)
637 if self.verbosity > 0:
639 "Starting " + self.getDescription(test) + " ...")
640 self.stream.writeln(single_line_delim)
642 def stopTest(self, test):
649 unittest.TestResult.stopTest(self, test)
650 if self.verbosity > 0:
651 self.stream.writeln(single_line_delim)
652 self.stream.writeln("%-60s%s" %
653 (self.getDescription(test), self.result_string))
654 self.stream.writeln(single_line_delim)
656 self.stream.writeln("%-60s%s" %
657 (self.getDescription(test), self.result_string))
659 def printErrors(self):
661 Print errors from running the test case
663 self.stream.writeln()
664 self.printErrorList('ERROR', self.errors)
665 self.printErrorList('FAIL', self.failures)
667 def printErrorList(self, flavour, errors):
669 Print error list to the output stream together with error type
670 and test case description.
672 :param flavour: error type
673 :param errors: iterable errors
676 for test, err in errors:
677 self.stream.writeln(double_line_delim)
678 self.stream.writeln("%s: %s" %
679 (flavour, self.getDescription(test)))
680 self.stream.writeln(single_line_delim)
681 self.stream.writeln("%s" % err)
684 class VppTestRunner(unittest.TextTestRunner):
686 A basic test runner implementation which prints results on standard error.
689 def resultclass(self):
690 """Class maintaining the results of the tests"""
700 print("Running tests using custom test runner") # debug message
701 return super(VppTestRunner, self).run(test)