3 from __future__ import print_function
13 from collections import deque
14 from threading import Thread, Event
15 from inspect import getdoc, isclass
16 from traceback import format_exception
17 from logging import FileHandler, DEBUG, Formatter
18 from scapy.packet import Raw
19 from hook import StepHook, PollHook
20 from vpp_pg_interface import VppPGInterface
21 from vpp_sub_interface import VppSubInterface
22 from vpp_lo_interface import VppLoInterface
23 from vpp_papi_provider import VppPapiProvider
25 from vpp_object import VppObjectRegistry
26 if os.name == 'posix' and sys.version_info[0] < 3:
27 # using subprocess32 is recommended by python official documentation
28 # @ https://docs.python.org/2/library/subprocess.html
29 import subprocess32 as subprocess
34 Test framework module.
36 The module provides a set of tools for constructing and running tests and
37 representing the results.
41 class _PacketInfo(object):
42 """Private class to create packet info object.
44 Help process information about the next packet.
45 Set variables to default values.
47 #: Store the index of the packet.
49 #: Store the index of the source packet generator interface of the packet.
51 #: Store the index of the destination packet generator interface
54 #: Store expected ip version
56 #: Store expected upper protocol
58 #: Store the copy of the former packet.
61 def __eq__(self, other):
62 index = self.index == other.index
63 src = self.src == other.src
64 dst = self.dst == other.dst
65 data = self.data == other.data
66 return index and src and dst and data
69 def pump_output(testclass):
70 """ pump output from vpp stdout/stderr to proper queues """
71 while not testclass.pump_thread_stop_flag.wait(0):
72 readable = select.select([testclass.vpp.stdout.fileno(),
73 testclass.vpp.stderr.fileno(),
74 testclass.pump_thread_wakeup_pipe[0]],
76 if testclass.vpp.stdout.fileno() in readable:
77 read = os.read(testclass.vpp.stdout.fileno(), 1024)
78 testclass.vpp_stdout_deque.append(read)
79 if testclass.vpp.stderr.fileno() in readable:
80 read = os.read(testclass.vpp.stderr.fileno(), 1024)
81 testclass.vpp_stderr_deque.append(read)
82 # ignoring the dummy pipe here intentionally - the flag will take care
83 # of properly terminating the loop
86 def running_extended_tests():
88 s = os.getenv("EXTENDED_TESTS")
89 return True if s.lower() in ("y", "yes", "1") else False
95 def running_on_centos():
97 os_id = os.getenv("OS_ID")
98 return True if "centos" in os_id.lower() else False
104 class KeepAliveReporter(object):
106 Singleton object which reports test start to parent process
111 self.__dict__ = self._shared_state
118 def pipe(self, pipe):
119 if hasattr(self, '_pipe'):
120 raise Exception("Internal error - pipe should only be set once.")
123 def send_keep_alive(self, test):
125 Write current test tmpdir & desc to keep-alive pipe to signal liveness
127 if self.pipe is None:
128 # if not running forked..
134 desc = test.shortDescription()
138 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
141 class VppTestCase(unittest.TestCase):
142 """This subclass is a base class for VPP test cases that are implemented as
143 classes. It provides methods to create and run test case.
147 def packet_infos(self):
148 """List of packet infos"""
149 return self._packet_infos
152 def get_packet_count_for_if_idx(cls, dst_if_index):
153 """Get the number of packet info for specified destination if index"""
154 if dst_if_index in cls._packet_count_for_dst_if_idx:
155 return cls._packet_count_for_dst_if_idx[dst_if_index]
161 """Return the instance of this testcase"""
162 return cls.test_instance
165 def set_debug_flags(cls, d):
166 cls.debug_core = False
167 cls.debug_gdb = False
168 cls.debug_gdbserver = False
173 cls.debug_core = True
176 elif dl == "gdbserver":
177 cls.debug_gdbserver = True
179 raise Exception("Unrecognized DEBUG option: '%s'" % d)
182 def setUpConstants(cls):
183 """ Set-up the test case class based on environment variables """
185 s = os.getenv("STEP")
186 cls.step = True if s.lower() in ("y", "yes", "1") else False
190 d = os.getenv("DEBUG")
193 cls.set_debug_flags(d)
194 cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
195 cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
196 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
198 if cls.plugin_path is not None:
199 if cls.extern_plugin_path is not None:
200 plugin_path = "%s:%s" % (
201 cls.plugin_path, cls.extern_plugin_path)
203 plugin_path = cls.plugin_path
204 elif cls.extern_plugin_path is not None:
205 plugin_path = cls.extern_plugin_path
207 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
208 debug_cli = "cli-listen localhost:5002"
211 size = os.getenv("COREDUMP_SIZE")
213 coredump_size = "coredump-size %s" % size
216 if coredump_size is None:
217 coredump_size = "coredump-size unlimited"
218 cls.vpp_cmdline = [cls.vpp_bin, "unix",
219 "{", "nodaemon", debug_cli, "full-coredump",
220 coredump_size, "}", "api-trace", "{", "on", "}",
221 "api-segment", "{", "prefix", cls.shm_prefix, "}",
222 "plugins", "{", "plugin", "dpdk_plugin.so", "{",
224 if plugin_path is not None:
225 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
226 cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
229 def wait_for_enter(cls):
230 if cls.debug_gdbserver:
231 print(double_line_delim)
232 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
234 print(double_line_delim)
235 print("Spawned VPP with PID: %d" % cls.vpp.pid)
237 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
239 print(single_line_delim)
240 print("You can debug the VPP using e.g.:")
241 if cls.debug_gdbserver:
242 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
243 print("Now is the time to attach a gdb by running the above "
244 "command, set up breakpoints etc. and then resume VPP from "
245 "within gdb by issuing the 'continue' command")
247 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
248 print("Now is the time to attach a gdb by running the above "
249 "command and set up breakpoints etc.")
250 print(single_line_delim)
251 raw_input("Press ENTER to continue running the testcase...")
255 cmdline = cls.vpp_cmdline
257 if cls.debug_gdbserver:
258 gdbserver = '/usr/bin/gdbserver'
259 if not os.path.isfile(gdbserver) or \
260 not os.access(gdbserver, os.X_OK):
261 raise Exception("gdbserver binary '%s' does not exist or is "
262 "not executable" % gdbserver)
264 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
265 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
268 cls.vpp = subprocess.Popen(cmdline,
269 stdout=subprocess.PIPE,
270 stderr=subprocess.PIPE,
272 except Exception as e:
273 cls.logger.critical("Couldn't start vpp: %s" % e)
281 Perform class setup before running the testcase
282 Remove shared memory files, start vpp and connect the vpp-api
284 gc.collect() # run garbage collection first
285 cls.logger = getLogger(cls.__name__)
286 cls.tempdir = tempfile.mkdtemp(
287 prefix='vpp-unittest-%s-' % cls.__name__)
288 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
289 cls.file_handler.setFormatter(
290 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
292 cls.file_handler.setLevel(DEBUG)
293 cls.logger.addHandler(cls.file_handler)
294 cls.shm_prefix = cls.tempdir.split("/")[-1]
295 os.chdir(cls.tempdir)
296 cls.logger.info("Temporary dir is %s, shm prefix is %s",
297 cls.tempdir, cls.shm_prefix)
299 cls.reset_packet_infos()
301 cls._zombie_captures = []
304 cls.registry = VppObjectRegistry()
305 cls.vpp_startup_failed = False
306 cls.reporter = KeepAliveReporter()
307 # need to catch exceptions here because if we raise, then the cleanup
308 # doesn't get called and we might end with a zombie vpp
311 cls.reporter.send_keep_alive(cls)
312 cls.vpp_stdout_deque = deque()
313 cls.vpp_stderr_deque = deque()
314 cls.pump_thread_stop_flag = Event()
315 cls.pump_thread_wakeup_pipe = os.pipe()
316 cls.pump_thread = Thread(target=pump_output, args=(cls,))
317 cls.pump_thread.daemon = True
318 cls.pump_thread.start()
319 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls)
324 cls.vapi.register_hook(hook)
325 cls.sleep(0.1, "after vpp startup, before initial poll")
329 cls.vpp_startup_failed = True
331 "VPP died shortly after startup, check the"
332 " output to standard error for possible cause")
337 if cls.debug_gdbserver:
338 print(colorize("You're running VPP inside gdbserver but "
339 "VPP-API connection failed, did you forget "
340 "to 'continue' VPP from within gdb?", RED))
343 t, v, tb = sys.exc_info()
353 Disconnect vpp-api, kill vpp and cleanup shared memory files
355 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
357 if cls.vpp.returncode is None:
358 print(double_line_delim)
359 print("VPP or GDB server is still running")
360 print(single_line_delim)
361 raw_input("When done debugging, press ENTER to kill the "
362 "process and finish running the testcase...")
364 os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
365 cls.pump_thread_stop_flag.set()
366 if hasattr(cls, 'pump_thread'):
367 cls.logger.debug("Waiting for pump thread to stop")
368 cls.pump_thread.join()
369 if hasattr(cls, 'vpp_stderr_reader_thread'):
370 cls.logger.debug("Waiting for stdderr pump to stop")
371 cls.vpp_stderr_reader_thread.join()
373 if hasattr(cls, 'vpp'):
374 if hasattr(cls, 'vapi'):
375 cls.vapi.disconnect()
378 if cls.vpp.returncode is None:
379 cls.logger.debug("Sending TERM to vpp")
381 cls.logger.debug("Waiting for vpp to die")
382 cls.vpp.communicate()
385 if cls.vpp_startup_failed:
386 stdout_log = cls.logger.info
387 stderr_log = cls.logger.critical
389 stdout_log = cls.logger.info
390 stderr_log = cls.logger.info
392 if hasattr(cls, 'vpp_stdout_deque'):
393 stdout_log(single_line_delim)
394 stdout_log('VPP output to stdout while running %s:', cls.__name__)
395 stdout_log(single_line_delim)
396 vpp_output = "".join(cls.vpp_stdout_deque)
397 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
399 stdout_log('\n%s', vpp_output)
400 stdout_log(single_line_delim)
402 if hasattr(cls, 'vpp_stderr_deque'):
403 stderr_log(single_line_delim)
404 stderr_log('VPP output to stderr while running %s:', cls.__name__)
405 stderr_log(single_line_delim)
406 vpp_output = "".join(cls.vpp_stderr_deque)
407 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
409 stderr_log('\n%s', vpp_output)
410 stderr_log(single_line_delim)
413 def tearDownClass(cls):
414 """ Perform final cleanup after running all tests in this test-case """
416 cls.file_handler.close()
419 """ Show various debug prints after each test """
420 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
421 (self.__class__.__name__, self._testMethodName,
422 self._testMethodDoc))
423 if not self.vpp_dead:
424 self.logger.debug(self.vapi.cli("show trace"))
425 self.logger.info(self.vapi.ppcli("show interface"))
426 self.logger.info(self.vapi.ppcli("show hardware"))
427 self.logger.info(self.vapi.ppcli("show error"))
428 self.logger.info(self.vapi.ppcli("show run"))
429 self.registry.remove_vpp_config(self.logger)
430 # Save/Dump VPP api trace log
431 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
432 tmp_api_trace = "/tmp/%s" % api_trace
433 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
434 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
435 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
437 os.rename(tmp_api_trace, vpp_api_trace_log)
438 self.logger.info(self.vapi.ppcli("api trace dump %s" %
441 self.registry.unregister_all(self.logger)
444 """ Clear trace before running each test"""
445 self.reporter.send_keep_alive(self)
446 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
447 (self.__class__.__name__, self._testMethodName,
448 self._testMethodDoc))
450 raise Exception("VPP is dead when setting up the test")
451 self.sleep(.1, "during setUp")
452 self.vpp_stdout_deque.append(
453 "--- test setUp() for %s.%s(%s) starts here ---\n" %
454 (self.__class__.__name__, self._testMethodName,
455 self._testMethodDoc))
456 self.vpp_stderr_deque.append(
457 "--- test setUp() for %s.%s(%s) starts here ---\n" %
458 (self.__class__.__name__, self._testMethodName,
459 self._testMethodDoc))
460 self.vapi.cli("clear trace")
461 # store the test instance inside the test class - so that objects
462 # holding the class can access instance methods (like assertEqual)
463 type(self).test_instance = self
466 def pg_enable_capture(cls, interfaces):
468 Enable capture on packet-generator interfaces
470 :param interfaces: iterable interface indexes
477 def register_capture(cls, cap_name):
478 """ Register a capture in the testclass """
479 # add to the list of captures with current timestamp
480 cls._captures.append((time.time(), cap_name))
481 # filter out from zombies
482 cls._zombie_captures = [(stamp, name)
483 for (stamp, name) in cls._zombie_captures
488 """ Remove any zombie captures and enable the packet generator """
489 # how long before capture is allowed to be deleted - otherwise vpp
490 # crashes - 100ms seems enough (this shouldn't be needed at all)
493 for stamp, cap_name in cls._zombie_captures:
494 wait = stamp + capture_ttl - now
496 cls.sleep(wait, "before deleting capture %s" % cap_name)
498 cls.logger.debug("Removing zombie capture %s" % cap_name)
499 cls.vapi.cli('packet-generator delete %s' % cap_name)
501 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
502 cls.vapi.cli('packet-generator enable')
503 cls._zombie_captures = cls._captures
507 def create_pg_interfaces(cls, interfaces):
509 Create packet-generator interfaces.
511 :param interfaces: iterable indexes of the interfaces.
512 :returns: List of created interfaces.
517 intf = VppPGInterface(cls, i)
518 setattr(cls, intf.name, intf)
520 cls.pg_interfaces = result
524 def create_loopback_interfaces(cls, interfaces):
526 Create loopback interfaces.
528 :param interfaces: iterable indexes of the interfaces.
529 :returns: List of created interfaces.
533 intf = VppLoInterface(cls, i)
534 setattr(cls, intf.name, intf)
536 cls.lo_interfaces = result
540 def extend_packet(packet, size):
542 Extend packet to given size by padding with spaces
543 NOTE: Currently works only when Raw layer is present.
545 :param packet: packet
546 :param size: target size
549 packet_len = len(packet) + 4
550 extend = size - packet_len
552 packet[Raw].load += ' ' * extend
555 def reset_packet_infos(cls):
556 """ Reset the list of packet info objects and packet counts to zero """
557 cls._packet_infos = {}
558 cls._packet_count_for_dst_if_idx = {}
561 def create_packet_info(cls, src_if, dst_if):
563 Create packet info object containing the source and destination indexes
564 and add it to the testcase's packet info list
566 :param VppInterface src_if: source interface
567 :param VppInterface dst_if: destination interface
569 :returns: _PacketInfo object
573 info.index = len(cls._packet_infos)
574 info.src = src_if.sw_if_index
575 info.dst = dst_if.sw_if_index
576 if isinstance(dst_if, VppSubInterface):
577 dst_idx = dst_if.parent.sw_if_index
579 dst_idx = dst_if.sw_if_index
580 if dst_idx in cls._packet_count_for_dst_if_idx:
581 cls._packet_count_for_dst_if_idx[dst_idx] += 1
583 cls._packet_count_for_dst_if_idx[dst_idx] = 1
584 cls._packet_infos[info.index] = info
588 def info_to_payload(info):
590 Convert _PacketInfo object to packet payload
592 :param info: _PacketInfo object
594 :returns: string containing serialized data from packet info
596 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
600 def payload_to_info(payload):
602 Convert packet payload to _PacketInfo object
604 :param payload: packet payload
606 :returns: _PacketInfo object containing de-serialized data from payload
609 numbers = payload.split()
611 info.index = int(numbers[0])
612 info.src = int(numbers[1])
613 info.dst = int(numbers[2])
614 info.ip = int(numbers[3])
615 info.proto = int(numbers[4])
618 def get_next_packet_info(self, info):
620 Iterate over the packet info list stored in the testcase
621 Start iteration with first element if info is None
622 Continue based on index in info if info is specified
624 :param info: info or None
625 :returns: next info in list or None if no more infos
630 next_index = info.index + 1
631 if next_index == len(self._packet_infos):
634 return self._packet_infos[next_index]
636 def get_next_packet_info_for_interface(self, src_index, info):
638 Search the packet info list for the next packet info with same source
641 :param src_index: source interface index to search for
642 :param info: packet info - where to start the search
643 :returns: packet info or None
647 info = self.get_next_packet_info(info)
650 if info.src == src_index:
653 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
655 Search the packet info list for the next packet info with same source
656 and destination interface indexes
658 :param src_index: source interface index to search for
659 :param dst_index: destination interface index to search for
660 :param info: packet info - where to start the search
661 :returns: packet info or None
665 info = self.get_next_packet_info_for_interface(src_index, info)
668 if info.dst == dst_index:
671 def assert_equal(self, real_value, expected_value, name_or_class=None):
672 if name_or_class is None:
673 self.assertEqual(real_value, expected_value)
676 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
677 msg = msg % (getdoc(name_or_class).strip(),
678 real_value, str(name_or_class(real_value)),
679 expected_value, str(name_or_class(expected_value)))
681 msg = "Invalid %s: %s does not match expected value %s" % (
682 name_or_class, real_value, expected_value)
684 self.assertEqual(real_value, expected_value, msg)
686 def assert_in_range(self,
694 msg = "Invalid %s: %s out of range <%s,%s>" % (
695 name, real_value, expected_min, expected_max)
696 self.assertTrue(expected_min <= real_value <= expected_max, msg)
699 def sleep(cls, timeout, remark=None):
700 if hasattr(cls, 'logger'):
701 cls.logger.debug("Starting sleep for %ss (%s)" % (timeout, remark))
705 if after - before > 2 * timeout:
706 cls.logger.error("unexpected time.sleep() result - "
707 "slept for %ss instead of ~%ss!" % (
708 after - before, timeout))
709 if hasattr(cls, 'logger'):
711 "Finished sleep (%s) - slept %ss (wanted %ss)" % (
712 remark, after - before, timeout))
715 class TestCasePrinter(object):
719 self.__dict__ = self._shared_state
720 if not hasattr(self, "_test_case_set"):
721 self._test_case_set = set()
723 def print_test_case_heading_if_first_time(self, case):
724 if case.__class__ not in self._test_case_set:
725 print(double_line_delim)
726 print(colorize(getdoc(case.__class__).splitlines()[0], YELLOW))
727 print(double_line_delim)
728 self._test_case_set.add(case.__class__)
731 class VppTestResult(unittest.TestResult):
733 @property result_string
734 String variable to store the test case result string.
736 List variable containing 2-tuples of TestCase instances and strings
737 holding formatted tracebacks. Each tuple represents a test which
738 raised an unexpected exception.
740 List variable containing 2-tuples of TestCase instances and strings
741 holding formatted tracebacks. Each tuple represents a test where
742 a failure was explicitly signalled using the TestCase.assert*()
746 def __init__(self, stream, descriptions, verbosity):
748 :param stream File descriptor to store where to report test results.
749 Set to the standard error stream by default.
750 :param descriptions Boolean variable to store information if to use
751 test case descriptions.
752 :param verbosity Integer variable to store required verbosity level.
754 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
756 self.descriptions = descriptions
757 self.verbosity = verbosity
758 self.result_string = None
759 self.printer = TestCasePrinter()
761 def addSuccess(self, test):
763 Record a test succeeded result
768 if hasattr(test, 'logger'):
769 test.logger.debug("--- addSuccess() %s.%s(%s) called"
770 % (test.__class__.__name__,
771 test._testMethodName,
772 test._testMethodDoc))
773 unittest.TestResult.addSuccess(self, test)
774 self.result_string = colorize("OK", GREEN)
776 def addSkip(self, test, reason):
778 Record a test skipped.
784 if hasattr(test, 'logger'):
785 test.logger.debug("--- addSkip() %s.%s(%s) called, reason is %s"
786 % (test.__class__.__name__,
787 test._testMethodName,
790 unittest.TestResult.addSkip(self, test, reason)
791 self.result_string = colorize("SKIP", YELLOW)
793 def symlink_failed(self, test):
795 if hasattr(test, 'logger'):
797 if hasattr(test, 'tempdir'):
799 failed_dir = os.getenv('VPP_TEST_FAILED_DIR')
800 link_path = '%s/%s-FAILED' % (failed_dir,
801 test.tempdir.split("/")[-1])
803 logger.debug("creating a link to the failed test")
804 logger.debug("os.symlink(%s, %s)" %
805 (test.tempdir, link_path))
806 os.symlink(test.tempdir, link_path)
807 except Exception as e:
811 def send_failure_through_pipe(self, test):
812 if hasattr(self, 'test_framework_failed_pipe'):
813 pipe = self.test_framework_failed_pipe
815 pipe.send(test.__class__)
817 def addFailure(self, test, err):
819 Record a test failed result
822 :param err: error message
825 if hasattr(test, 'logger'):
826 test.logger.debug("--- addFailure() %s.%s(%s) called, err is %s"
827 % (test.__class__.__name__,
828 test._testMethodName,
829 test._testMethodDoc, err))
830 test.logger.debug("formatted exception is:\n%s" %
831 "".join(format_exception(*err)))
832 unittest.TestResult.addFailure(self, test, err)
833 if hasattr(test, 'tempdir'):
834 self.result_string = colorize("FAIL", RED) + \
835 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
836 self.symlink_failed(test)
838 self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
840 self.send_failure_through_pipe(test)
842 def addError(self, test, err):
844 Record a test error result
847 :param err: error message
850 if hasattr(test, 'logger'):
851 test.logger.debug("--- addError() %s.%s(%s) called, err is %s"
852 % (test.__class__.__name__,
853 test._testMethodName,
854 test._testMethodDoc, err))
855 test.logger.debug("formatted exception is:\n%s" %
856 "".join(format_exception(*err)))
857 unittest.TestResult.addError(self, test, err)
858 if hasattr(test, 'tempdir'):
859 self.result_string = colorize("ERROR", RED) + \
860 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
861 self.symlink_failed(test)
863 self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
865 self.send_failure_through_pipe(test)
867 def getDescription(self, test):
872 :returns: test description
875 # TODO: if none print warning not raise exception
876 short_description = test.shortDescription()
877 if self.descriptions and short_description:
878 return short_description
882 def startTest(self, test):
889 self.printer.print_test_case_heading_if_first_time(test)
890 unittest.TestResult.startTest(self, test)
891 if self.verbosity > 0:
893 "Starting " + self.getDescription(test) + " ...")
894 self.stream.writeln(single_line_delim)
896 def stopTest(self, test):
903 unittest.TestResult.stopTest(self, test)
904 if self.verbosity > 0:
905 self.stream.writeln(single_line_delim)
906 self.stream.writeln("%-73s%s" % (self.getDescription(test),
908 self.stream.writeln(single_line_delim)
910 self.stream.writeln("%-73s%s" % (self.getDescription(test),
913 def printErrors(self):
915 Print errors from running the test case
917 self.stream.writeln()
918 self.printErrorList('ERROR', self.errors)
919 self.printErrorList('FAIL', self.failures)
921 def printErrorList(self, flavour, errors):
923 Print error list to the output stream together with error type
924 and test case description.
926 :param flavour: error type
927 :param errors: iterable errors
930 for test, err in errors:
931 self.stream.writeln(double_line_delim)
932 self.stream.writeln("%s: %s" %
933 (flavour, self.getDescription(test)))
934 self.stream.writeln(single_line_delim)
935 self.stream.writeln("%s" % err)
938 class Filter_by_test_option:
939 def __init__(self, filter_file_name, filter_class_name, filter_func_name):
940 self.filter_file_name = filter_file_name
941 self.filter_class_name = filter_class_name
942 self.filter_func_name = filter_func_name
944 def __call__(self, file_name, class_name, func_name):
945 if self.filter_file_name and file_name != self.filter_file_name:
947 if self.filter_class_name and class_name != self.filter_class_name:
949 if self.filter_func_name and func_name != self.filter_func_name:
954 class VppTestRunner(unittest.TextTestRunner):
956 A basic test runner implementation which prints results to standard error.
959 def resultclass(self):
960 """Class maintaining the results of the tests"""
963 def __init__(self, keep_alive_pipe=None, failed_pipe=None,
964 stream=sys.stderr, descriptions=True,
965 verbosity=1, failfast=False, buffer=False, resultclass=None):
966 # ignore stream setting here, use hard-coded stdout to be in sync
967 # with prints from VppTestCase methods ...
968 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
969 verbosity, failfast, buffer,
971 reporter = KeepAliveReporter()
972 reporter.pipe = keep_alive_pipe
973 # this is super-ugly, but very simple to implement and works as long
974 # as we run only one test at the same time
975 VppTestResult.test_framework_failed_pipe = failed_pipe
979 def parse_test_option(self):
981 f = os.getenv(self.test_option)
984 filter_file_name = None
985 filter_class_name = None
986 filter_func_name = None
991 raise Exception("Unrecognized %s option: %s" %
992 (self.test_option, f))
994 if parts[2] not in ('*', ''):
995 filter_func_name = parts[2]
996 if parts[1] not in ('*', ''):
997 filter_class_name = parts[1]
998 if parts[0] not in ('*', ''):
999 if parts[0].startswith('test_'):
1000 filter_file_name = parts[0]
1002 filter_file_name = 'test_%s' % parts[0]
1004 if f.startswith('test_'):
1005 filter_file_name = f
1007 filter_file_name = 'test_%s' % f
1008 return filter_file_name, filter_class_name, filter_func_name
1011 def filter_tests(tests, filter_cb):
1012 result = unittest.suite.TestSuite()
1014 if isinstance(t, unittest.suite.TestSuite):
1015 # this is a bunch of tests, recursively filter...
1016 x = filter_tests(t, filter_cb)
1017 if x.countTestCases() > 0:
1019 elif isinstance(t, unittest.TestCase):
1020 # this is a single test
1021 parts = t.id().split('.')
1022 # t.id() for common cases like this:
1023 # test_classifier.TestClassifier.test_acl_ip
1024 # apply filtering only if it is so
1026 if not filter_cb(parts[0], parts[1], parts[2]):
1030 # unexpected object, don't touch it
1034 def run(self, test):
1041 faulthandler.enable() # emit stack trace to stderr if killed by signal
1042 print("Running tests using custom test runner") # debug message
1043 filter_file, filter_class, filter_func = self.parse_test_option()
1044 print("Active filters: file=%s, class=%s, function=%s" % (
1045 filter_file, filter_class, filter_func))
1046 filter_cb = Filter_by_test_option(
1047 filter_file, filter_class, filter_func)
1048 filtered = self.filter_tests(test, filter_cb)
1049 print("%s out of %s tests match specified filters" % (
1050 filtered.countTestCases(), test.countTestCases()))
1051 if not running_extended_tests():
1052 print("Not running extended tests (some tests will be skipped)")
1053 return super(VppTestRunner, self).run(filtered)