3 from __future__ import print_function
13 from collections import deque
14 from threading import Thread, Event
15 from inspect import getdoc, isclass
16 from traceback import format_exception
17 from logging import FileHandler, DEBUG, Formatter
18 from scapy.packet import Raw
19 from hook import StepHook, PollHook
20 from vpp_pg_interface import VppPGInterface
21 from vpp_sub_interface import VppSubInterface
22 from vpp_lo_interface import VppLoInterface
23 from vpp_papi_provider import VppPapiProvider
25 from vpp_object import VppObjectRegistry
26 if os.name == 'posix' and sys.version_info[0] < 3:
27 # using subprocess32 is recommended by python official documentation
28 # @ https://docs.python.org/2/library/subprocess.html
29 import subprocess32 as subprocess
34 Test framework module.
36 The module provides a set of tools for constructing and running tests and
37 representing the results.
41 class _PacketInfo(object):
42 """Private class to create packet info object.
44 Help process information about the next packet.
45 Set variables to default values.
47 #: Store the index of the packet.
49 #: Store the index of the source packet generator interface of the packet.
51 #: Store the index of the destination packet generator interface
54 #: Store expected ip version
56 #: Store expected upper protocol
58 #: Store the copy of the former packet.
61 def __eq__(self, other):
62 index = self.index == other.index
63 src = self.src == other.src
64 dst = self.dst == other.dst
65 data = self.data == other.data
66 return index and src and dst and data
69 def pump_output(testclass):
70 """ pump output from vpp stdout/stderr to proper queues """
71 while not testclass.pump_thread_stop_flag.wait(0):
72 readable = select.select([testclass.vpp.stdout.fileno(),
73 testclass.vpp.stderr.fileno(),
74 testclass.pump_thread_wakeup_pipe[0]],
76 if testclass.vpp.stdout.fileno() in readable:
77 read = os.read(testclass.vpp.stdout.fileno(), 1024)
78 testclass.vpp_stdout_deque.append(read)
79 if testclass.vpp.stderr.fileno() in readable:
80 read = os.read(testclass.vpp.stderr.fileno(), 1024)
81 testclass.vpp_stderr_deque.append(read)
82 # ignoring the dummy pipe here intentionally - the flag will take care
83 # of properly terminating the loop
86 def running_extended_tests():
88 s = os.getenv("EXTENDED_TESTS")
89 return True if s.lower() in ("y", "yes", "1") else False
95 class KeepAliveReporter(object):
97 Singleton object which reports test start to parent process
102 self.__dict__ = self._shared_state
109 def pipe(self, pipe):
110 if hasattr(self, '_pipe'):
111 raise Exception("Internal error - pipe should only be set once.")
114 def send_keep_alive(self, test):
116 Write current test tmpdir & desc to keep-alive pipe to signal liveness
121 desc = test.shortDescription()
125 self.pipe.send((desc, test.vpp_bin, test.tempdir))
128 class VppTestCase(unittest.TestCase):
129 """This subclass is a base class for VPP test cases that are implemented as
130 classes. It provides methods to create and run test case.
134 def packet_infos(self):
135 """List of packet infos"""
136 return self._packet_infos
139 def get_packet_count_for_if_idx(cls, dst_if_index):
140 """Get the number of packet info for specified destination if index"""
141 if dst_if_index in cls._packet_count_for_dst_if_idx:
142 return cls._packet_count_for_dst_if_idx[dst_if_index]
148 """Return the instance of this testcase"""
149 return cls.test_instance
152 def set_debug_flags(cls, d):
153 cls.debug_core = False
154 cls.debug_gdb = False
155 cls.debug_gdbserver = False
160 cls.debug_core = True
163 elif dl == "gdbserver":
164 cls.debug_gdbserver = True
166 raise Exception("Unrecognized DEBUG option: '%s'" % d)
169 def setUpConstants(cls):
170 """ Set-up the test case class based on environment variables """
172 s = os.getenv("STEP")
173 cls.step = True if s.lower() in ("y", "yes", "1") else False
177 d = os.getenv("DEBUG")
180 cls.set_debug_flags(d)
181 cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
182 cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
183 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
185 if cls.plugin_path is not None:
186 if cls.extern_plugin_path is not None:
187 plugin_path = "%s:%s" % (
188 cls.plugin_path, cls.extern_plugin_path)
190 plugin_path = cls.plugin_path
191 elif cls.extern_plugin_path is not None:
192 plugin_path = cls.extern_plugin_path
194 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
195 debug_cli = "cli-listen localhost:5002"
198 size = os.getenv("COREDUMP_SIZE")
200 coredump_size = "coredump-size %s" % size
203 if coredump_size is None:
204 coredump_size = "coredump-size unlimited"
205 cls.vpp_cmdline = [cls.vpp_bin, "unix",
206 "{", "nodaemon", debug_cli, coredump_size, "}",
207 "api-trace", "{", "on", "}",
208 "api-segment", "{", "prefix", cls.shm_prefix, "}",
209 "plugins", "{", "plugin", "dpdk_plugin.so", "{",
211 if plugin_path is not None:
212 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
213 cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
216 def wait_for_enter(cls):
217 if cls.debug_gdbserver:
218 print(double_line_delim)
219 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
221 print(double_line_delim)
222 print("Spawned VPP with PID: %d" % cls.vpp.pid)
224 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
226 print(single_line_delim)
227 print("You can debug the VPP using e.g.:")
228 if cls.debug_gdbserver:
229 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
230 print("Now is the time to attach a gdb by running the above "
231 "command, set up breakpoints etc. and then resume VPP from "
232 "within gdb by issuing the 'continue' command")
234 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
235 print("Now is the time to attach a gdb by running the above "
236 "command and set up breakpoints etc.")
237 print(single_line_delim)
238 raw_input("Press ENTER to continue running the testcase...")
242 cmdline = cls.vpp_cmdline
244 if cls.debug_gdbserver:
245 gdbserver = '/usr/bin/gdbserver'
246 if not os.path.isfile(gdbserver) or \
247 not os.access(gdbserver, os.X_OK):
248 raise Exception("gdbserver binary '%s' does not exist or is "
249 "not executable" % gdbserver)
251 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
252 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
255 cls.vpp = subprocess.Popen(cmdline,
256 stdout=subprocess.PIPE,
257 stderr=subprocess.PIPE,
259 except Exception as e:
260 cls.logger.critical("Couldn't start vpp: %s" % e)
268 Perform class setup before running the testcase
269 Remove shared memory files, start vpp and connect the vpp-api
271 gc.collect() # run garbage collection first
272 cls.logger = getLogger(cls.__name__)
273 cls.tempdir = tempfile.mkdtemp(
274 prefix='vpp-unittest-' + cls.__name__ + '-')
275 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
276 cls.file_handler.setFormatter(
277 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
279 cls.file_handler.setLevel(DEBUG)
280 cls.logger.addHandler(cls.file_handler)
281 cls.shm_prefix = cls.tempdir.split("/")[-1]
282 os.chdir(cls.tempdir)
283 cls.logger.info("Temporary dir is %s, shm prefix is %s",
284 cls.tempdir, cls.shm_prefix)
286 cls.reset_packet_infos()
288 cls._zombie_captures = []
291 cls.registry = VppObjectRegistry()
292 cls.vpp_startup_failed = False
293 cls.reporter = KeepAliveReporter()
294 cls.reporter.send_keep_alive(cls)
295 # need to catch exceptions here because if we raise, then the cleanup
296 # doesn't get called and we might end with a zombie vpp
299 cls.vpp_stdout_deque = deque()
300 cls.vpp_stderr_deque = deque()
301 cls.pump_thread_stop_flag = Event()
302 cls.pump_thread_wakeup_pipe = os.pipe()
303 cls.pump_thread = Thread(target=pump_output, args=(cls,))
304 cls.pump_thread.daemon = True
305 cls.pump_thread.start()
306 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls)
311 cls.vapi.register_hook(hook)
312 cls.sleep(0.1, "after vpp startup, before initial poll")
316 cls.vpp_startup_failed = True
318 "VPP died shortly after startup, check the"
319 " output to standard error for possible cause")
324 if cls.debug_gdbserver:
325 print(colorize("You're running VPP inside gdbserver but "
326 "VPP-API connection failed, did you forget "
327 "to 'continue' VPP from within gdb?", RED))
330 t, v, tb = sys.exc_info()
340 Disconnect vpp-api, kill vpp and cleanup shared memory files
342 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
344 if cls.vpp.returncode is None:
345 print(double_line_delim)
346 print("VPP or GDB server is still running")
347 print(single_line_delim)
348 raw_input("When done debugging, press ENTER to kill the "
349 "process and finish running the testcase...")
351 os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
352 cls.pump_thread_stop_flag.set()
353 if hasattr(cls, 'pump_thread'):
354 cls.logger.debug("Waiting for pump thread to stop")
355 cls.pump_thread.join()
356 if hasattr(cls, 'vpp_stderr_reader_thread'):
357 cls.logger.debug("Waiting for stdderr pump to stop")
358 cls.vpp_stderr_reader_thread.join()
360 if hasattr(cls, 'vpp'):
361 if hasattr(cls, 'vapi'):
362 cls.vapi.disconnect()
365 if cls.vpp.returncode is None:
366 cls.logger.debug("Sending TERM to vpp")
368 cls.logger.debug("Waiting for vpp to die")
369 cls.vpp.communicate()
372 if cls.vpp_startup_failed:
373 stdout_log = cls.logger.info
374 stderr_log = cls.logger.critical
376 stdout_log = cls.logger.info
377 stderr_log = cls.logger.info
379 if hasattr(cls, 'vpp_stdout_deque'):
380 stdout_log(single_line_delim)
381 stdout_log('VPP output to stdout while running %s:', cls.__name__)
382 stdout_log(single_line_delim)
383 vpp_output = "".join(cls.vpp_stdout_deque)
384 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
386 stdout_log('\n%s', vpp_output)
387 stdout_log(single_line_delim)
389 if hasattr(cls, 'vpp_stderr_deque'):
390 stderr_log(single_line_delim)
391 stderr_log('VPP output to stderr while running %s:', cls.__name__)
392 stderr_log(single_line_delim)
393 vpp_output = "".join(cls.vpp_stderr_deque)
394 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
396 stderr_log('\n%s', vpp_output)
397 stderr_log(single_line_delim)
400 def tearDownClass(cls):
401 """ Perform final cleanup after running all tests in this test-case """
403 cls.file_handler.close()
406 """ Show various debug prints after each test """
407 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
408 (self.__class__.__name__, self._testMethodName,
409 self._testMethodDoc))
410 if not self.vpp_dead:
411 self.logger.debug(self.vapi.cli("show trace"))
412 self.logger.info(self.vapi.ppcli("show interface"))
413 self.logger.info(self.vapi.ppcli("show hardware"))
414 self.logger.info(self.vapi.ppcli("show error"))
415 self.logger.info(self.vapi.ppcli("show run"))
416 self.registry.remove_vpp_config(self.logger)
417 # Save/Dump VPP api trace log
418 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
419 tmp_api_trace = "/tmp/%s" % api_trace
420 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
421 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
422 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
424 os.rename(tmp_api_trace, vpp_api_trace_log)
425 self.logger.info(self.vapi.ppcli("api trace dump %s" %
428 self.registry.unregister_all(self.logger)
431 """ Clear trace before running each test"""
432 self.reporter.send_keep_alive(self)
433 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
434 (self.__class__.__name__, self._testMethodName,
435 self._testMethodDoc))
437 raise Exception("VPP is dead when setting up the test")
438 self.sleep(.1, "during setUp")
439 self.vpp_stdout_deque.append(
440 "--- test setUp() for %s.%s(%s) starts here ---\n" %
441 (self.__class__.__name__, self._testMethodName,
442 self._testMethodDoc))
443 self.vpp_stderr_deque.append(
444 "--- test setUp() for %s.%s(%s) starts here ---\n" %
445 (self.__class__.__name__, self._testMethodName,
446 self._testMethodDoc))
447 self.vapi.cli("clear trace")
448 # store the test instance inside the test class - so that objects
449 # holding the class can access instance methods (like assertEqual)
450 type(self).test_instance = self
453 def pg_enable_capture(cls, interfaces):
455 Enable capture on packet-generator interfaces
457 :param interfaces: iterable interface indexes
464 def register_capture(cls, cap_name):
465 """ Register a capture in the testclass """
466 # add to the list of captures with current timestamp
467 cls._captures.append((time.time(), cap_name))
468 # filter out from zombies
469 cls._zombie_captures = [(stamp, name)
470 for (stamp, name) in cls._zombie_captures
475 """ Remove any zombie captures and enable the packet generator """
476 # how long before capture is allowed to be deleted - otherwise vpp
477 # crashes - 100ms seems enough (this shouldn't be needed at all)
480 for stamp, cap_name in cls._zombie_captures:
481 wait = stamp + capture_ttl - now
483 cls.sleep(wait, "before deleting capture %s" % cap_name)
485 cls.logger.debug("Removing zombie capture %s" % cap_name)
486 cls.vapi.cli('packet-generator delete %s' % cap_name)
488 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
489 cls.vapi.cli('packet-generator enable')
490 cls._zombie_captures = cls._captures
494 def create_pg_interfaces(cls, interfaces):
496 Create packet-generator interfaces.
498 :param interfaces: iterable indexes of the interfaces.
499 :returns: List of created interfaces.
504 intf = VppPGInterface(cls, i)
505 setattr(cls, intf.name, intf)
507 cls.pg_interfaces = result
511 def create_loopback_interfaces(cls, interfaces):
513 Create loopback interfaces.
515 :param interfaces: iterable indexes of the interfaces.
516 :returns: List of created interfaces.
520 intf = VppLoInterface(cls, i)
521 setattr(cls, intf.name, intf)
523 cls.lo_interfaces = result
527 def extend_packet(packet, size):
529 Extend packet to given size by padding with spaces
530 NOTE: Currently works only when Raw layer is present.
532 :param packet: packet
533 :param size: target size
536 packet_len = len(packet) + 4
537 extend = size - packet_len
539 packet[Raw].load += ' ' * extend
542 def reset_packet_infos(cls):
543 """ Reset the list of packet info objects and packet counts to zero """
544 cls._packet_infos = {}
545 cls._packet_count_for_dst_if_idx = {}
548 def create_packet_info(cls, src_if, dst_if):
550 Create packet info object containing the source and destination indexes
551 and add it to the testcase's packet info list
553 :param VppInterface src_if: source interface
554 :param VppInterface dst_if: destination interface
556 :returns: _PacketInfo object
560 info.index = len(cls._packet_infos)
561 info.src = src_if.sw_if_index
562 info.dst = dst_if.sw_if_index
563 if isinstance(dst_if, VppSubInterface):
564 dst_idx = dst_if.parent.sw_if_index
566 dst_idx = dst_if.sw_if_index
567 if dst_idx in cls._packet_count_for_dst_if_idx:
568 cls._packet_count_for_dst_if_idx[dst_idx] += 1
570 cls._packet_count_for_dst_if_idx[dst_idx] = 1
571 cls._packet_infos[info.index] = info
575 def info_to_payload(info):
577 Convert _PacketInfo object to packet payload
579 :param info: _PacketInfo object
581 :returns: string containing serialized data from packet info
583 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
587 def payload_to_info(payload):
589 Convert packet payload to _PacketInfo object
591 :param payload: packet payload
593 :returns: _PacketInfo object containing de-serialized data from payload
596 numbers = payload.split()
598 info.index = int(numbers[0])
599 info.src = int(numbers[1])
600 info.dst = int(numbers[2])
601 info.ip = int(numbers[3])
602 info.proto = int(numbers[4])
605 def get_next_packet_info(self, info):
607 Iterate over the packet info list stored in the testcase
608 Start iteration with first element if info is None
609 Continue based on index in info if info is specified
611 :param info: info or None
612 :returns: next info in list or None if no more infos
617 next_index = info.index + 1
618 if next_index == len(self._packet_infos):
621 return self._packet_infos[next_index]
623 def get_next_packet_info_for_interface(self, src_index, info):
625 Search the packet info list for the next packet info with same source
628 :param src_index: source interface index to search for
629 :param info: packet info - where to start the search
630 :returns: packet info or None
634 info = self.get_next_packet_info(info)
637 if info.src == src_index:
640 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
642 Search the packet info list for the next packet info with same source
643 and destination interface indexes
645 :param src_index: source interface index to search for
646 :param dst_index: destination interface index to search for
647 :param info: packet info - where to start the search
648 :returns: packet info or None
652 info = self.get_next_packet_info_for_interface(src_index, info)
655 if info.dst == dst_index:
658 def assert_equal(self, real_value, expected_value, name_or_class=None):
659 if name_or_class is None:
660 self.assertEqual(real_value, expected_value)
663 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
664 msg = msg % (getdoc(name_or_class).strip(),
665 real_value, str(name_or_class(real_value)),
666 expected_value, str(name_or_class(expected_value)))
668 msg = "Invalid %s: %s does not match expected value %s" % (
669 name_or_class, real_value, expected_value)
671 self.assertEqual(real_value, expected_value, msg)
673 def assert_in_range(self,
681 msg = "Invalid %s: %s out of range <%s,%s>" % (
682 name, real_value, expected_min, expected_max)
683 self.assertTrue(expected_min <= real_value <= expected_max, msg)
686 def sleep(cls, timeout, remark=None):
687 if hasattr(cls, 'logger'):
688 cls.logger.debug("Starting sleep for %ss (%s)" % (timeout, remark))
692 if after - before > 2 * timeout:
693 cls.logger.error("unexpected time.sleep() result - "
694 "slept for %ss instead of ~%ss!" % (
695 after - before, timeout))
696 if hasattr(cls, 'logger'):
698 "Finished sleep (%s) - slept %ss (wanted %ss)" % (
699 remark, after - before, timeout))
702 class TestCasePrinter(object):
706 self.__dict__ = self._shared_state
707 if not hasattr(self, "_test_case_set"):
708 self._test_case_set = set()
710 def print_test_case_heading_if_first_time(self, case):
711 if case.__class__ not in self._test_case_set:
712 print(double_line_delim)
713 print(colorize(getdoc(case.__class__).splitlines()[0], YELLOW))
714 print(double_line_delim)
715 self._test_case_set.add(case.__class__)
718 class VppTestResult(unittest.TestResult):
720 @property result_string
721 String variable to store the test case result string.
723 List variable containing 2-tuples of TestCase instances and strings
724 holding formatted tracebacks. Each tuple represents a test which
725 raised an unexpected exception.
727 List variable containing 2-tuples of TestCase instances and strings
728 holding formatted tracebacks. Each tuple represents a test where
729 a failure was explicitly signalled using the TestCase.assert*()
733 def __init__(self, stream, descriptions, verbosity):
735 :param stream File descriptor to store where to report test results.
736 Set to the standard error stream by default.
737 :param descriptions Boolean variable to store information if to use
738 test case descriptions.
739 :param verbosity Integer variable to store required verbosity level.
741 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
743 self.descriptions = descriptions
744 self.verbosity = verbosity
745 self.result_string = None
746 self.printer = TestCasePrinter()
748 def addSuccess(self, test):
750 Record a test succeeded result
755 if hasattr(test, 'logger'):
756 test.logger.debug("--- addSuccess() %s.%s(%s) called"
757 % (test.__class__.__name__,
758 test._testMethodName,
759 test._testMethodDoc))
760 unittest.TestResult.addSuccess(self, test)
761 self.result_string = colorize("OK", GREEN)
763 def addSkip(self, test, reason):
765 Record a test skipped.
771 if hasattr(test, 'logger'):
772 test.logger.debug("--- addSkip() %s.%s(%s) called, reason is %s"
773 % (test.__class__.__name__,
774 test._testMethodName,
777 unittest.TestResult.addSkip(self, test, reason)
778 self.result_string = colorize("SKIP", YELLOW)
780 def addFailure(self, test, err):
782 Record a test failed result
785 :param err: error message
788 if hasattr(test, 'logger'):
789 test.logger.debug("--- addFailure() %s.%s(%s) called, err is %s"
790 % (test.__class__.__name__,
791 test._testMethodName,
792 test._testMethodDoc, err))
793 test.logger.debug("formatted exception is:\n%s" %
794 "".join(format_exception(*err)))
795 unittest.TestResult.addFailure(self, test, err)
796 if hasattr(test, 'tempdir'):
797 self.result_string = colorize("FAIL", RED) + \
798 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
800 self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
802 def addError(self, test, err):
804 Record a test error result
807 :param err: error message
810 if hasattr(test, 'logger'):
811 test.logger.debug("--- addError() %s.%s(%s) called, err is %s"
812 % (test.__class__.__name__,
813 test._testMethodName,
814 test._testMethodDoc, err))
815 test.logger.debug("formatted exception is:\n%s" %
816 "".join(format_exception(*err)))
817 unittest.TestResult.addError(self, test, err)
818 if hasattr(test, 'tempdir'):
819 self.result_string = colorize("ERROR", RED) + \
820 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
822 self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
824 def getDescription(self, test):
829 :returns: test description
832 # TODO: if none print warning not raise exception
833 short_description = test.shortDescription()
834 if self.descriptions and short_description:
835 return short_description
839 def startTest(self, test):
846 self.printer.print_test_case_heading_if_first_time(test)
847 unittest.TestResult.startTest(self, test)
848 if self.verbosity > 0:
850 "Starting " + self.getDescription(test) + " ...")
851 self.stream.writeln(single_line_delim)
853 def stopTest(self, test):
860 unittest.TestResult.stopTest(self, test)
861 if self.verbosity > 0:
862 self.stream.writeln(single_line_delim)
863 self.stream.writeln("%-73s%s" % (self.getDescription(test),
865 self.stream.writeln(single_line_delim)
867 self.stream.writeln("%-73s%s" % (self.getDescription(test),
870 def printErrors(self):
872 Print errors from running the test case
874 self.stream.writeln()
875 self.printErrorList('ERROR', self.errors)
876 self.printErrorList('FAIL', self.failures)
878 def printErrorList(self, flavour, errors):
880 Print error list to the output stream together with error type
881 and test case description.
883 :param flavour: error type
884 :param errors: iterable errors
887 for test, err in errors:
888 self.stream.writeln(double_line_delim)
889 self.stream.writeln("%s: %s" %
890 (flavour, self.getDescription(test)))
891 self.stream.writeln(single_line_delim)
892 self.stream.writeln("%s" % err)
895 class VppTestRunner(unittest.TextTestRunner):
897 A basic test runner implementation which prints results to standard error.
900 def resultclass(self):
901 """Class maintaining the results of the tests"""
904 def __init__(self, pipe, stream=sys.stderr, descriptions=True, verbosity=1,
905 failfast=False, buffer=False, resultclass=None):
906 # ignore stream setting here, use hard-coded stdout to be in sync
907 # with prints from VppTestCase methods ...
908 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
909 verbosity, failfast, buffer,
911 reporter = KeepAliveReporter()
916 def parse_test_option(self):
918 f = os.getenv(self.test_option)
921 filter_file_name = None
922 filter_class_name = None
923 filter_func_name = None
928 raise Exception("Unrecognized %s option: %s" %
929 (self.test_option, f))
931 if parts[2] not in ('*', ''):
932 filter_func_name = parts[2]
933 if parts[1] not in ('*', ''):
934 filter_class_name = parts[1]
935 if parts[0] not in ('*', ''):
936 if parts[0].startswith('test_'):
937 filter_file_name = parts[0]
939 filter_file_name = 'test_%s' % parts[0]
941 if f.startswith('test_'):
944 filter_file_name = 'test_%s' % f
945 return filter_file_name, filter_class_name, filter_func_name
947 def filter_tests(self, tests, filter_file, filter_class, filter_func):
948 result = unittest.suite.TestSuite()
950 if isinstance(t, unittest.suite.TestSuite):
951 # this is a bunch of tests, recursively filter...
952 x = self.filter_tests(t, filter_file, filter_class,
954 if x.countTestCases() > 0:
956 elif isinstance(t, unittest.TestCase):
957 # this is a single test
958 parts = t.id().split('.')
959 # t.id() for common cases like this:
960 # test_classifier.TestClassifier.test_acl_ip
961 # apply filtering only if it is so
963 if filter_file and filter_file != parts[0]:
965 if filter_class and filter_class != parts[1]:
967 if filter_func and filter_func != parts[2]:
971 # unexpected object, don't touch it
982 faulthandler.enable() # emit stack trace to stderr if killed by signal
983 print("Running tests using custom test runner") # debug message
984 filter_file, filter_class, filter_func = self.parse_test_option()
985 print("Active filters: file=%s, class=%s, function=%s" % (
986 filter_file, filter_class, filter_func))
987 filtered = self.filter_tests(test, filter_file, filter_class,
989 print("%s out of %s tests match specified filters" % (
990 filtered.countTestCases(), test.countTestCases()))
991 if not running_extended_tests():
992 print("Not running extended tests (some tests will be skipped)")
993 return super(VppTestRunner, self).run(filtered)