3 from __future__ import print_function
13 from collections import deque
14 from threading import Thread, Event
15 from inspect import getdoc, isclass
16 from traceback import format_exception
17 from logging import FileHandler, DEBUG, Formatter
18 from scapy.packet import Raw
19 from hook import StepHook, PollHook
20 from vpp_pg_interface import VppPGInterface
21 from vpp_sub_interface import VppSubInterface
22 from vpp_lo_interface import VppLoInterface
23 from vpp_papi_provider import VppPapiProvider
25 from vpp_object import VppObjectRegistry
26 if os.name == 'posix' and sys.version_info[0] < 3:
27 # using subprocess32 is recommended by python official documentation
28 # @ https://docs.python.org/2/library/subprocess.html
29 import subprocess32 as subprocess
34 Test framework module.
36 The module provides a set of tools for constructing and running tests and
37 representing the results.
41 class _PacketInfo(object):
42 """Private class to create packet info object.
44 Help process information about the next packet.
45 Set variables to default values.
47 #: Store the index of the packet.
49 #: Store the index of the source packet generator interface of the packet.
51 #: Store the index of the destination packet generator interface
54 #: Store expected ip version
56 #: Store expected upper protocol
58 #: Store the copy of the former packet.
61 def __eq__(self, other):
62 index = self.index == other.index
63 src = self.src == other.src
64 dst = self.dst == other.dst
65 data = self.data == other.data
66 return index and src and dst and data
69 def pump_output(testclass):
70 """ pump output from vpp stdout/stderr to proper queues """
71 while not testclass.pump_thread_stop_flag.wait(0):
72 readable = select.select([testclass.vpp.stdout.fileno(),
73 testclass.vpp.stderr.fileno(),
74 testclass.pump_thread_wakeup_pipe[0]],
76 if testclass.vpp.stdout.fileno() in readable:
77 read = os.read(testclass.vpp.stdout.fileno(), 1024)
78 testclass.vpp_stdout_deque.append(read)
79 if testclass.vpp.stderr.fileno() in readable:
80 read = os.read(testclass.vpp.stderr.fileno(), 1024)
81 testclass.vpp_stderr_deque.append(read)
82 # ignoring the dummy pipe here intentionally - the flag will take care
83 # of properly terminating the loop
86 def running_extended_tests():
88 s = os.getenv("EXTENDED_TESTS")
89 return True if s.lower() in ("y", "yes", "1") else False
95 class KeepAliveReporter(object):
97 Singleton object which reports test start to parent process
102 self.__dict__ = self._shared_state
109 def pipe(self, pipe):
110 if hasattr(self, '_pipe'):
111 raise Exception("Internal error - pipe should only be set once.")
114 def send_keep_alive(self, test):
116 Write current test tmpdir & desc to keep-alive pipe to signal liveness
118 if self.pipe is None:
119 # if not running forked..
125 desc = test.shortDescription()
129 self.pipe.send((desc, test.vpp_bin, test.tempdir))
132 class VppTestCase(unittest.TestCase):
133 """This subclass is a base class for VPP test cases that are implemented as
134 classes. It provides methods to create and run test case.
138 def packet_infos(self):
139 """List of packet infos"""
140 return self._packet_infos
143 def get_packet_count_for_if_idx(cls, dst_if_index):
144 """Get the number of packet info for specified destination if index"""
145 if dst_if_index in cls._packet_count_for_dst_if_idx:
146 return cls._packet_count_for_dst_if_idx[dst_if_index]
152 """Return the instance of this testcase"""
153 return cls.test_instance
156 def set_debug_flags(cls, d):
157 cls.debug_core = False
158 cls.debug_gdb = False
159 cls.debug_gdbserver = False
164 cls.debug_core = True
167 elif dl == "gdbserver":
168 cls.debug_gdbserver = True
170 raise Exception("Unrecognized DEBUG option: '%s'" % d)
173 def setUpConstants(cls):
174 """ Set-up the test case class based on environment variables """
176 s = os.getenv("STEP")
177 cls.step = True if s.lower() in ("y", "yes", "1") else False
181 d = os.getenv("DEBUG")
184 cls.set_debug_flags(d)
185 cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
186 cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
187 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
189 if cls.plugin_path is not None:
190 if cls.extern_plugin_path is not None:
191 plugin_path = "%s:%s" % (
192 cls.plugin_path, cls.extern_plugin_path)
194 plugin_path = cls.plugin_path
195 elif cls.extern_plugin_path is not None:
196 plugin_path = cls.extern_plugin_path
198 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
199 debug_cli = "cli-listen localhost:5002"
202 size = os.getenv("COREDUMP_SIZE")
204 coredump_size = "coredump-size %s" % size
207 if coredump_size is None:
208 coredump_size = "coredump-size unlimited"
209 cls.vpp_cmdline = [cls.vpp_bin, "unix",
210 "{", "nodaemon", debug_cli, coredump_size, "}",
211 "api-trace", "{", "on", "}",
212 "api-segment", "{", "prefix", cls.shm_prefix, "}",
213 "plugins", "{", "plugin", "dpdk_plugin.so", "{",
215 if plugin_path is not None:
216 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
217 cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
220 def wait_for_enter(cls):
221 if cls.debug_gdbserver:
222 print(double_line_delim)
223 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
225 print(double_line_delim)
226 print("Spawned VPP with PID: %d" % cls.vpp.pid)
228 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
230 print(single_line_delim)
231 print("You can debug the VPP using e.g.:")
232 if cls.debug_gdbserver:
233 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
234 print("Now is the time to attach a gdb by running the above "
235 "command, set up breakpoints etc. and then resume VPP from "
236 "within gdb by issuing the 'continue' command")
238 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
239 print("Now is the time to attach a gdb by running the above "
240 "command and set up breakpoints etc.")
241 print(single_line_delim)
242 raw_input("Press ENTER to continue running the testcase...")
246 cmdline = cls.vpp_cmdline
248 if cls.debug_gdbserver:
249 gdbserver = '/usr/bin/gdbserver'
250 if not os.path.isfile(gdbserver) or \
251 not os.access(gdbserver, os.X_OK):
252 raise Exception("gdbserver binary '%s' does not exist or is "
253 "not executable" % gdbserver)
255 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
256 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
259 cls.vpp = subprocess.Popen(cmdline,
260 stdout=subprocess.PIPE,
261 stderr=subprocess.PIPE,
263 except Exception as e:
264 cls.logger.critical("Couldn't start vpp: %s" % e)
272 Perform class setup before running the testcase
273 Remove shared memory files, start vpp and connect the vpp-api
275 gc.collect() # run garbage collection first
276 cls.logger = getLogger(cls.__name__)
277 cls.tempdir = tempfile.mkdtemp(
278 prefix='vpp-unittest-%s-' % cls.__name__)
279 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
280 cls.file_handler.setFormatter(
281 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
283 cls.file_handler.setLevel(DEBUG)
284 cls.logger.addHandler(cls.file_handler)
285 cls.shm_prefix = cls.tempdir.split("/")[-1]
286 os.chdir(cls.tempdir)
287 cls.logger.info("Temporary dir is %s, shm prefix is %s",
288 cls.tempdir, cls.shm_prefix)
290 cls.reset_packet_infos()
292 cls._zombie_captures = []
295 cls.registry = VppObjectRegistry()
296 cls.vpp_startup_failed = False
297 cls.reporter = KeepAliveReporter()
298 cls.reporter.send_keep_alive(cls)
299 # need to catch exceptions here because if we raise, then the cleanup
300 # doesn't get called and we might end with a zombie vpp
303 cls.vpp_stdout_deque = deque()
304 cls.vpp_stderr_deque = deque()
305 cls.pump_thread_stop_flag = Event()
306 cls.pump_thread_wakeup_pipe = os.pipe()
307 cls.pump_thread = Thread(target=pump_output, args=(cls,))
308 cls.pump_thread.daemon = True
309 cls.pump_thread.start()
310 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls)
315 cls.vapi.register_hook(hook)
316 cls.sleep(0.1, "after vpp startup, before initial poll")
320 cls.vpp_startup_failed = True
322 "VPP died shortly after startup, check the"
323 " output to standard error for possible cause")
328 if cls.debug_gdbserver:
329 print(colorize("You're running VPP inside gdbserver but "
330 "VPP-API connection failed, did you forget "
331 "to 'continue' VPP from within gdb?", RED))
334 t, v, tb = sys.exc_info()
344 Disconnect vpp-api, kill vpp and cleanup shared memory files
346 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
348 if cls.vpp.returncode is None:
349 print(double_line_delim)
350 print("VPP or GDB server is still running")
351 print(single_line_delim)
352 raw_input("When done debugging, press ENTER to kill the "
353 "process and finish running the testcase...")
355 os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
356 cls.pump_thread_stop_flag.set()
357 if hasattr(cls, 'pump_thread'):
358 cls.logger.debug("Waiting for pump thread to stop")
359 cls.pump_thread.join()
360 if hasattr(cls, 'vpp_stderr_reader_thread'):
361 cls.logger.debug("Waiting for stdderr pump to stop")
362 cls.vpp_stderr_reader_thread.join()
364 if hasattr(cls, 'vpp'):
365 if hasattr(cls, 'vapi'):
366 cls.vapi.disconnect()
369 if cls.vpp.returncode is None:
370 cls.logger.debug("Sending TERM to vpp")
372 cls.logger.debug("Waiting for vpp to die")
373 cls.vpp.communicate()
376 if cls.vpp_startup_failed:
377 stdout_log = cls.logger.info
378 stderr_log = cls.logger.critical
380 stdout_log = cls.logger.info
381 stderr_log = cls.logger.info
383 if hasattr(cls, 'vpp_stdout_deque'):
384 stdout_log(single_line_delim)
385 stdout_log('VPP output to stdout while running %s:', cls.__name__)
386 stdout_log(single_line_delim)
387 vpp_output = "".join(cls.vpp_stdout_deque)
388 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
390 stdout_log('\n%s', vpp_output)
391 stdout_log(single_line_delim)
393 if hasattr(cls, 'vpp_stderr_deque'):
394 stderr_log(single_line_delim)
395 stderr_log('VPP output to stderr while running %s:', cls.__name__)
396 stderr_log(single_line_delim)
397 vpp_output = "".join(cls.vpp_stderr_deque)
398 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
400 stderr_log('\n%s', vpp_output)
401 stderr_log(single_line_delim)
404 def tearDownClass(cls):
405 """ Perform final cleanup after running all tests in this test-case """
407 cls.file_handler.close()
410 """ Show various debug prints after each test """
411 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
412 (self.__class__.__name__, self._testMethodName,
413 self._testMethodDoc))
414 if not self.vpp_dead:
415 self.logger.debug(self.vapi.cli("show trace"))
416 self.logger.info(self.vapi.ppcli("show interface"))
417 self.logger.info(self.vapi.ppcli("show hardware"))
418 self.logger.info(self.vapi.ppcli("show error"))
419 self.logger.info(self.vapi.ppcli("show run"))
420 self.registry.remove_vpp_config(self.logger)
421 # Save/Dump VPP api trace log
422 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
423 tmp_api_trace = "/tmp/%s" % api_trace
424 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
425 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
426 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
428 os.rename(tmp_api_trace, vpp_api_trace_log)
429 self.logger.info(self.vapi.ppcli("api trace dump %s" %
432 self.registry.unregister_all(self.logger)
435 """ Clear trace before running each test"""
436 self.reporter.send_keep_alive(self)
437 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
438 (self.__class__.__name__, self._testMethodName,
439 self._testMethodDoc))
441 raise Exception("VPP is dead when setting up the test")
442 self.sleep(.1, "during setUp")
443 self.vpp_stdout_deque.append(
444 "--- test setUp() for %s.%s(%s) starts here ---\n" %
445 (self.__class__.__name__, self._testMethodName,
446 self._testMethodDoc))
447 self.vpp_stderr_deque.append(
448 "--- test setUp() for %s.%s(%s) starts here ---\n" %
449 (self.__class__.__name__, self._testMethodName,
450 self._testMethodDoc))
451 self.vapi.cli("clear trace")
452 # store the test instance inside the test class - so that objects
453 # holding the class can access instance methods (like assertEqual)
454 type(self).test_instance = self
457 def pg_enable_capture(cls, interfaces):
459 Enable capture on packet-generator interfaces
461 :param interfaces: iterable interface indexes
468 def register_capture(cls, cap_name):
469 """ Register a capture in the testclass """
470 # add to the list of captures with current timestamp
471 cls._captures.append((time.time(), cap_name))
472 # filter out from zombies
473 cls._zombie_captures = [(stamp, name)
474 for (stamp, name) in cls._zombie_captures
479 """ Remove any zombie captures and enable the packet generator """
480 # how long before capture is allowed to be deleted - otherwise vpp
481 # crashes - 100ms seems enough (this shouldn't be needed at all)
484 for stamp, cap_name in cls._zombie_captures:
485 wait = stamp + capture_ttl - now
487 cls.sleep(wait, "before deleting capture %s" % cap_name)
489 cls.logger.debug("Removing zombie capture %s" % cap_name)
490 cls.vapi.cli('packet-generator delete %s' % cap_name)
492 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
493 cls.vapi.cli('packet-generator enable')
494 cls._zombie_captures = cls._captures
498 def create_pg_interfaces(cls, interfaces):
500 Create packet-generator interfaces.
502 :param interfaces: iterable indexes of the interfaces.
503 :returns: List of created interfaces.
508 intf = VppPGInterface(cls, i)
509 setattr(cls, intf.name, intf)
511 cls.pg_interfaces = result
515 def create_loopback_interfaces(cls, interfaces):
517 Create loopback interfaces.
519 :param interfaces: iterable indexes of the interfaces.
520 :returns: List of created interfaces.
524 intf = VppLoInterface(cls, i)
525 setattr(cls, intf.name, intf)
527 cls.lo_interfaces = result
531 def extend_packet(packet, size):
533 Extend packet to given size by padding with spaces
534 NOTE: Currently works only when Raw layer is present.
536 :param packet: packet
537 :param size: target size
540 packet_len = len(packet) + 4
541 extend = size - packet_len
543 packet[Raw].load += ' ' * extend
546 def reset_packet_infos(cls):
547 """ Reset the list of packet info objects and packet counts to zero """
548 cls._packet_infos = {}
549 cls._packet_count_for_dst_if_idx = {}
552 def create_packet_info(cls, src_if, dst_if):
554 Create packet info object containing the source and destination indexes
555 and add it to the testcase's packet info list
557 :param VppInterface src_if: source interface
558 :param VppInterface dst_if: destination interface
560 :returns: _PacketInfo object
564 info.index = len(cls._packet_infos)
565 info.src = src_if.sw_if_index
566 info.dst = dst_if.sw_if_index
567 if isinstance(dst_if, VppSubInterface):
568 dst_idx = dst_if.parent.sw_if_index
570 dst_idx = dst_if.sw_if_index
571 if dst_idx in cls._packet_count_for_dst_if_idx:
572 cls._packet_count_for_dst_if_idx[dst_idx] += 1
574 cls._packet_count_for_dst_if_idx[dst_idx] = 1
575 cls._packet_infos[info.index] = info
579 def info_to_payload(info):
581 Convert _PacketInfo object to packet payload
583 :param info: _PacketInfo object
585 :returns: string containing serialized data from packet info
587 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
591 def payload_to_info(payload):
593 Convert packet payload to _PacketInfo object
595 :param payload: packet payload
597 :returns: _PacketInfo object containing de-serialized data from payload
600 numbers = payload.split()
602 info.index = int(numbers[0])
603 info.src = int(numbers[1])
604 info.dst = int(numbers[2])
605 info.ip = int(numbers[3])
606 info.proto = int(numbers[4])
609 def get_next_packet_info(self, info):
611 Iterate over the packet info list stored in the testcase
612 Start iteration with first element if info is None
613 Continue based on index in info if info is specified
615 :param info: info or None
616 :returns: next info in list or None if no more infos
621 next_index = info.index + 1
622 if next_index == len(self._packet_infos):
625 return self._packet_infos[next_index]
627 def get_next_packet_info_for_interface(self, src_index, info):
629 Search the packet info list for the next packet info with same source
632 :param src_index: source interface index to search for
633 :param info: packet info - where to start the search
634 :returns: packet info or None
638 info = self.get_next_packet_info(info)
641 if info.src == src_index:
644 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
646 Search the packet info list for the next packet info with same source
647 and destination interface indexes
649 :param src_index: source interface index to search for
650 :param dst_index: destination interface index to search for
651 :param info: packet info - where to start the search
652 :returns: packet info or None
656 info = self.get_next_packet_info_for_interface(src_index, info)
659 if info.dst == dst_index:
662 def assert_equal(self, real_value, expected_value, name_or_class=None):
663 if name_or_class is None:
664 self.assertEqual(real_value, expected_value)
667 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
668 msg = msg % (getdoc(name_or_class).strip(),
669 real_value, str(name_or_class(real_value)),
670 expected_value, str(name_or_class(expected_value)))
672 msg = "Invalid %s: %s does not match expected value %s" % (
673 name_or_class, real_value, expected_value)
675 self.assertEqual(real_value, expected_value, msg)
677 def assert_in_range(self,
685 msg = "Invalid %s: %s out of range <%s,%s>" % (
686 name, real_value, expected_min, expected_max)
687 self.assertTrue(expected_min <= real_value <= expected_max, msg)
690 def sleep(cls, timeout, remark=None):
691 if hasattr(cls, 'logger'):
692 cls.logger.debug("Starting sleep for %ss (%s)" % (timeout, remark))
696 if after - before > 2 * timeout:
697 cls.logger.error("unexpected time.sleep() result - "
698 "slept for %ss instead of ~%ss!" % (
699 after - before, timeout))
700 if hasattr(cls, 'logger'):
702 "Finished sleep (%s) - slept %ss (wanted %ss)" % (
703 remark, after - before, timeout))
706 class TestCasePrinter(object):
710 self.__dict__ = self._shared_state
711 if not hasattr(self, "_test_case_set"):
712 self._test_case_set = set()
714 def print_test_case_heading_if_first_time(self, case):
715 if case.__class__ not in self._test_case_set:
716 print(double_line_delim)
717 print(colorize(getdoc(case.__class__).splitlines()[0], YELLOW))
718 print(double_line_delim)
719 self._test_case_set.add(case.__class__)
722 class VppTestResult(unittest.TestResult):
724 @property result_string
725 String variable to store the test case result string.
727 List variable containing 2-tuples of TestCase instances and strings
728 holding formatted tracebacks. Each tuple represents a test which
729 raised an unexpected exception.
731 List variable containing 2-tuples of TestCase instances and strings
732 holding formatted tracebacks. Each tuple represents a test where
733 a failure was explicitly signalled using the TestCase.assert*()
737 def __init__(self, stream, descriptions, verbosity):
739 :param stream File descriptor to store where to report test results.
740 Set to the standard error stream by default.
741 :param descriptions Boolean variable to store information if to use
742 test case descriptions.
743 :param verbosity Integer variable to store required verbosity level.
745 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
747 self.descriptions = descriptions
748 self.verbosity = verbosity
749 self.result_string = None
750 self.printer = TestCasePrinter()
752 def addSuccess(self, test):
754 Record a test succeeded result
759 if hasattr(test, 'logger'):
760 test.logger.debug("--- addSuccess() %s.%s(%s) called"
761 % (test.__class__.__name__,
762 test._testMethodName,
763 test._testMethodDoc))
764 unittest.TestResult.addSuccess(self, test)
765 self.result_string = colorize("OK", GREEN)
767 def addSkip(self, test, reason):
769 Record a test skipped.
775 if hasattr(test, 'logger'):
776 test.logger.debug("--- addSkip() %s.%s(%s) called, reason is %s"
777 % (test.__class__.__name__,
778 test._testMethodName,
781 unittest.TestResult.addSkip(self, test, reason)
782 self.result_string = colorize("SKIP", YELLOW)
784 def symlink_failed(self, test):
786 if hasattr(test, 'logger'):
788 if hasattr(test, 'tempdir'):
790 failed_dir = os.getenv('VPP_TEST_FAILED_DIR')
791 link_path = '%s/%s-FAILED' % (failed_dir,
792 test.tempdir.split("/")[-1])
794 logger.debug("creating a link to the failed test")
795 logger.debug("os.symlink(%s, %s)" %
796 (test.tempdir, link_path))
797 os.symlink(test.tempdir, link_path)
798 except Exception as e:
802 def addFailure(self, test, err):
804 Record a test failed result
807 :param err: error message
810 if hasattr(test, 'logger'):
811 test.logger.debug("--- addFailure() %s.%s(%s) called, err is %s"
812 % (test.__class__.__name__,
813 test._testMethodName,
814 test._testMethodDoc, err))
815 test.logger.debug("formatted exception is:\n%s" %
816 "".join(format_exception(*err)))
817 unittest.TestResult.addFailure(self, test, err)
818 if hasattr(test, 'tempdir'):
819 self.result_string = colorize("FAIL", RED) + \
820 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
821 self.symlink_failed(test)
823 self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
825 def addError(self, test, err):
827 Record a test error result
830 :param err: error message
833 if hasattr(test, 'logger'):
834 test.logger.debug("--- addError() %s.%s(%s) called, err is %s"
835 % (test.__class__.__name__,
836 test._testMethodName,
837 test._testMethodDoc, err))
838 test.logger.debug("formatted exception is:\n%s" %
839 "".join(format_exception(*err)))
840 unittest.TestResult.addError(self, test, err)
841 if hasattr(test, 'tempdir'):
842 self.result_string = colorize("ERROR", RED) + \
843 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
844 self.symlink_failed(test)
846 self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
848 def getDescription(self, test):
853 :returns: test description
856 # TODO: if none print warning not raise exception
857 short_description = test.shortDescription()
858 if self.descriptions and short_description:
859 return short_description
863 def startTest(self, test):
870 self.printer.print_test_case_heading_if_first_time(test)
871 unittest.TestResult.startTest(self, test)
872 if self.verbosity > 0:
874 "Starting " + self.getDescription(test) + " ...")
875 self.stream.writeln(single_line_delim)
877 def stopTest(self, test):
884 unittest.TestResult.stopTest(self, test)
885 if self.verbosity > 0:
886 self.stream.writeln(single_line_delim)
887 self.stream.writeln("%-73s%s" % (self.getDescription(test),
889 self.stream.writeln(single_line_delim)
891 self.stream.writeln("%-73s%s" % (self.getDescription(test),
894 def printErrors(self):
896 Print errors from running the test case
898 self.stream.writeln()
899 self.printErrorList('ERROR', self.errors)
900 self.printErrorList('FAIL', self.failures)
902 def printErrorList(self, flavour, errors):
904 Print error list to the output stream together with error type
905 and test case description.
907 :param flavour: error type
908 :param errors: iterable errors
911 for test, err in errors:
912 self.stream.writeln(double_line_delim)
913 self.stream.writeln("%s: %s" %
914 (flavour, self.getDescription(test)))
915 self.stream.writeln(single_line_delim)
916 self.stream.writeln("%s" % err)
919 class VppTestRunner(unittest.TextTestRunner):
921 A basic test runner implementation which prints results to standard error.
924 def resultclass(self):
925 """Class maintaining the results of the tests"""
928 def __init__(self, pipe=None, stream=sys.stderr, descriptions=True,
929 verbosity=1, failfast=False, buffer=False, resultclass=None):
930 # ignore stream setting here, use hard-coded stdout to be in sync
931 # with prints from VppTestCase methods ...
932 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
933 verbosity, failfast, buffer,
935 reporter = KeepAliveReporter()
940 def parse_test_option(self):
942 f = os.getenv(self.test_option)
945 filter_file_name = None
946 filter_class_name = None
947 filter_func_name = None
952 raise Exception("Unrecognized %s option: %s" %
953 (self.test_option, f))
955 if parts[2] not in ('*', ''):
956 filter_func_name = parts[2]
957 if parts[1] not in ('*', ''):
958 filter_class_name = parts[1]
959 if parts[0] not in ('*', ''):
960 if parts[0].startswith('test_'):
961 filter_file_name = parts[0]
963 filter_file_name = 'test_%s' % parts[0]
965 if f.startswith('test_'):
968 filter_file_name = 'test_%s' % f
969 return filter_file_name, filter_class_name, filter_func_name
971 def filter_tests(self, tests, filter_file, filter_class, filter_func):
972 result = unittest.suite.TestSuite()
974 if isinstance(t, unittest.suite.TestSuite):
975 # this is a bunch of tests, recursively filter...
976 x = self.filter_tests(t, filter_file, filter_class,
978 if x.countTestCases() > 0:
980 elif isinstance(t, unittest.TestCase):
981 # this is a single test
982 parts = t.id().split('.')
983 # t.id() for common cases like this:
984 # test_classifier.TestClassifier.test_acl_ip
985 # apply filtering only if it is so
987 if filter_file and filter_file != parts[0]:
989 if filter_class and filter_class != parts[1]:
991 if filter_func and filter_func != parts[2]:
995 # unexpected object, don't touch it
1006 faulthandler.enable() # emit stack trace to stderr if killed by signal
1007 print("Running tests using custom test runner") # debug message
1008 filter_file, filter_class, filter_func = self.parse_test_option()
1009 print("Active filters: file=%s, class=%s, function=%s" % (
1010 filter_file, filter_class, filter_func))
1011 filtered = self.filter_tests(test, filter_file, filter_class,
1013 print("%s out of %s tests match specified filters" % (
1014 filtered.countTestCases(), test.countTestCases()))
1015 if not running_extended_tests():
1016 print("Not running extended tests (some tests will be skipped)")
1017 return super(VppTestRunner, self).run(filtered)