3 from __future__ import print_function
12 from collections import deque
13 from threading import Thread, Event
14 from inspect import getdoc
15 from traceback import format_exception
16 from logging import FileHandler, DEBUG, Formatter
17 from scapy.packet import Raw
18 from hook import StepHook, PollHook
19 from vpp_pg_interface import VppPGInterface
20 from vpp_sub_interface import VppSubInterface
21 from vpp_lo_interface import VppLoInterface
22 from vpp_papi_provider import VppPapiProvider
24 from vpp_object import VppObjectRegistry
25 if os.name == 'posix' and sys.version_info[0] < 3:
26 # using subprocess32 is recommended by python official documentation
27 # @ https://docs.python.org/2/library/subprocess.html
28 import subprocess32 as subprocess
33 Test framework module.
35 The module provides a set of tools for constructing and running tests and
36 representing the results.
40 class _PacketInfo(object):
41 """Private class to create packet info object.
43 Help process information about the next packet.
44 Set variables to default values.
46 #: Store the index of the packet.
48 #: Store the index of the source packet generator interface of the packet.
50 #: Store the index of the destination packet generator interface
53 #: Store expected ip version
55 #: Store expected upper protocol
57 #: Store the copy of the former packet.
60 def __eq__(self, other):
61 index = self.index == other.index
62 src = self.src == other.src
63 dst = self.dst == other.dst
64 data = self.data == other.data
65 return index and src and dst and data
68 def pump_output(testclass):
69 """ pump output from vpp stdout/stderr to proper queues """
70 while not testclass.pump_thread_stop_flag.wait(0):
71 readable = select.select([testclass.vpp.stdout.fileno(),
72 testclass.vpp.stderr.fileno(),
73 testclass.pump_thread_wakeup_pipe[0]],
75 if testclass.vpp.stdout.fileno() in readable:
76 read = os.read(testclass.vpp.stdout.fileno(), 1024)
77 testclass.vpp_stdout_deque.append(read)
78 if testclass.vpp.stderr.fileno() in readable:
79 read = os.read(testclass.vpp.stderr.fileno(), 1024)
80 testclass.vpp_stderr_deque.append(read)
81 # ignoring the dummy pipe here intentionally - the flag will take care
82 # of properly terminating the loop
85 def running_extended_tests():
87 s = os.getenv("EXTENDED_TESTS")
88 return True if s.lower() in ("y", "yes", "1") else False
94 class VppTestCase(unittest.TestCase):
95 """This subclass is a base class for VPP test cases that are implemented as
96 classes. It provides methods to create and run test case.
100 def packet_infos(self):
101 """List of packet infos"""
102 return self._packet_infos
105 def get_packet_count_for_if_idx(cls, dst_if_index):
106 """Get the number of packet info for specified destination if index"""
107 if dst_if_index in cls._packet_count_for_dst_if_idx:
108 return cls._packet_count_for_dst_if_idx[dst_if_index]
114 """Return the instance of this testcase"""
115 return cls.test_instance
118 def set_debug_flags(cls, d):
119 cls.debug_core = False
120 cls.debug_gdb = False
121 cls.debug_gdbserver = False
126 cls.debug_core = True
129 elif dl == "gdbserver":
130 cls.debug_gdbserver = True
132 raise Exception("Unrecognized DEBUG option: '%s'" % d)
135 def setUpConstants(cls):
136 """ Set-up the test case class based on environment variables """
138 s = os.getenv("STEP")
139 cls.step = True if s.lower() in ("y", "yes", "1") else False
143 d = os.getenv("DEBUG")
146 cls.set_debug_flags(d)
147 cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
148 cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
149 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
151 if cls.plugin_path is not None:
152 if cls.extern_plugin_path is not None:
153 plugin_path = "%s:%s" % (
154 cls.plugin_path, cls.extern_plugin_path)
156 plugin_path = cls.plugin_path
157 elif cls.extern_plugin_path is not None:
158 plugin_path = cls.extern_plugin_path
160 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
161 debug_cli = "cli-listen localhost:5002"
164 size = os.getenv("COREDUMP_SIZE")
166 coredump_size = "coredump-size %s" % size
169 if coredump_size is None:
170 coredump_size = "coredump-size unlimited"
171 cls.vpp_cmdline = [cls.vpp_bin, "unix",
172 "{", "nodaemon", debug_cli, coredump_size, "}",
173 "api-trace", "{", "on", "}",
174 "api-segment", "{", "prefix", cls.shm_prefix, "}",
175 "plugins", "{", "plugin", "dpdk_plugin.so", "{",
177 if plugin_path is not None:
178 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
179 cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
182 def wait_for_enter(cls):
183 if cls.debug_gdbserver:
184 print(double_line_delim)
185 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
187 print(double_line_delim)
188 print("Spawned VPP with PID: %d" % cls.vpp.pid)
190 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
192 print(single_line_delim)
193 print("You can debug the VPP using e.g.:")
194 if cls.debug_gdbserver:
195 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
196 print("Now is the time to attach a gdb by running the above "
197 "command, set up breakpoints etc. and then resume VPP from "
198 "within gdb by issuing the 'continue' command")
200 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
201 print("Now is the time to attach a gdb by running the above "
202 "command and set up breakpoints etc.")
203 print(single_line_delim)
204 raw_input("Press ENTER to continue running the testcase...")
208 cmdline = cls.vpp_cmdline
210 if cls.debug_gdbserver:
211 gdbserver = '/usr/bin/gdbserver'
212 if not os.path.isfile(gdbserver) or \
213 not os.access(gdbserver, os.X_OK):
214 raise Exception("gdbserver binary '%s' does not exist or is "
215 "not executable" % gdbserver)
217 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
218 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
221 cls.vpp = subprocess.Popen(cmdline,
222 stdout=subprocess.PIPE,
223 stderr=subprocess.PIPE,
225 except Exception as e:
226 cls.logger.critical("Couldn't start vpp: %s" % e)
234 Perform class setup before running the testcase
235 Remove shared memory files, start vpp and connect the vpp-api
237 gc.collect() # run garbage collection first
238 cls.logger = getLogger(cls.__name__)
239 cls.tempdir = tempfile.mkdtemp(
240 prefix='vpp-unittest-' + cls.__name__ + '-')
241 file_handler = FileHandler("%s/log.txt" % cls.tempdir)
242 file_handler.setFormatter(
243 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
245 file_handler.setLevel(DEBUG)
246 cls.logger.addHandler(file_handler)
247 cls.shm_prefix = cls.tempdir.split("/")[-1]
248 os.chdir(cls.tempdir)
249 cls.logger.info("Temporary dir is %s, shm prefix is %s",
250 cls.tempdir, cls.shm_prefix)
252 cls.reset_packet_infos()
254 cls._zombie_captures = []
257 cls.registry = VppObjectRegistry()
258 # need to catch exceptions here because if we raise, then the cleanup
259 # doesn't get called and we might end with a zombie vpp
262 cls.vpp_stdout_deque = deque()
263 cls.vpp_stderr_deque = deque()
264 cls.pump_thread_stop_flag = Event()
265 cls.pump_thread_wakeup_pipe = os.pipe()
266 cls.pump_thread = Thread(target=pump_output, args=(cls,))
267 cls.pump_thread.daemon = True
268 cls.pump_thread.start()
269 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls)
274 cls.vapi.register_hook(hook)
275 cls.sleep(0.1, "after vpp startup, before initial poll")
280 if cls.debug_gdbserver:
281 print(colorize("You're running VPP inside gdbserver but "
282 "VPP-API connection failed, did you forget "
283 "to 'continue' VPP from within gdb?", RED))
286 t, v, tb = sys.exc_info()
296 Disconnect vpp-api, kill vpp and cleanup shared memory files
298 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
300 if cls.vpp.returncode is None:
301 print(double_line_delim)
302 print("VPP or GDB server is still running")
303 print(single_line_delim)
304 raw_input("When done debugging, press ENTER to kill the "
305 "process and finish running the testcase...")
307 os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
308 cls.pump_thread_stop_flag.set()
309 if hasattr(cls, 'pump_thread'):
310 cls.logger.debug("Waiting for pump thread to stop")
311 cls.pump_thread.join()
312 if hasattr(cls, 'vpp_stderr_reader_thread'):
313 cls.logger.debug("Waiting for stdderr pump to stop")
314 cls.vpp_stderr_reader_thread.join()
316 if hasattr(cls, 'vpp'):
317 if hasattr(cls, 'vapi'):
318 cls.vapi.disconnect()
321 if cls.vpp.returncode is None:
322 cls.logger.debug("Sending TERM to vpp")
324 cls.logger.debug("Waiting for vpp to die")
325 cls.vpp.communicate()
328 if hasattr(cls, 'vpp_stdout_deque'):
329 cls.logger.info(single_line_delim)
330 cls.logger.info('VPP output to stdout while running %s:',
332 cls.logger.info(single_line_delim)
333 f = open(cls.tempdir + '/vpp_stdout.txt', 'w')
334 vpp_output = "".join(cls.vpp_stdout_deque)
336 cls.logger.info('\n%s', vpp_output)
337 cls.logger.info(single_line_delim)
339 if hasattr(cls, 'vpp_stderr_deque'):
340 cls.logger.info(single_line_delim)
341 cls.logger.info('VPP output to stderr while running %s:',
343 cls.logger.info(single_line_delim)
344 f = open(cls.tempdir + '/vpp_stderr.txt', 'w')
345 vpp_output = "".join(cls.vpp_stderr_deque)
347 cls.logger.info('\n%s', vpp_output)
348 cls.logger.info(single_line_delim)
351 def tearDownClass(cls):
352 """ Perform final cleanup after running all tests in this test-case """
356 """ Show various debug prints after each test """
357 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
358 (self.__class__.__name__, self._testMethodName,
359 self._testMethodDoc))
360 if not self.vpp_dead:
361 self.logger.debug(self.vapi.cli("show trace"))
362 self.logger.info(self.vapi.ppcli("show interfaces"))
363 self.logger.info(self.vapi.ppcli("show hardware"))
364 self.logger.info(self.vapi.ppcli("show error"))
365 self.logger.info(self.vapi.ppcli("show run"))
366 self.registry.remove_vpp_config(self.logger)
367 # Save/Dump VPP api trace log
368 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
369 tmp_api_trace = "/tmp/%s" % api_trace
370 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
371 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
372 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
374 os.rename(tmp_api_trace, vpp_api_trace_log)
375 self.logger.info(self.vapi.ppcli("api trace dump %s" %
378 self.registry.unregister_all(self.logger)
381 """ Clear trace before running each test"""
382 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
383 (self.__class__.__name__, self._testMethodName,
384 self._testMethodDoc))
386 raise Exception("VPP is dead when setting up the test")
387 self.sleep(.1, "during setUp")
388 self.vpp_stdout_deque.append(
389 "--- test setUp() for %s.%s(%s) starts here ---\n" %
390 (self.__class__.__name__, self._testMethodName,
391 self._testMethodDoc))
392 self.vpp_stderr_deque.append(
393 "--- test setUp() for %s.%s(%s) starts here ---\n" %
394 (self.__class__.__name__, self._testMethodName,
395 self._testMethodDoc))
396 self.vapi.cli("clear trace")
397 # store the test instance inside the test class - so that objects
398 # holding the class can access instance methods (like assertEqual)
399 type(self).test_instance = self
402 def pg_enable_capture(cls, interfaces):
404 Enable capture on packet-generator interfaces
406 :param interfaces: iterable interface indexes
413 def register_capture(cls, cap_name):
414 """ Register a capture in the testclass """
415 # add to the list of captures with current timestamp
416 cls._captures.append((time.time(), cap_name))
417 # filter out from zombies
418 cls._zombie_captures = [(stamp, name)
419 for (stamp, name) in cls._zombie_captures
424 """ Remove any zombie captures and enable the packet generator """
425 # how long before capture is allowed to be deleted - otherwise vpp
426 # crashes - 100ms seems enough (this shouldn't be needed at all)
429 for stamp, cap_name in cls._zombie_captures:
430 wait = stamp + capture_ttl - now
432 cls.sleep(wait, "before deleting capture %s" % cap_name)
434 cls.logger.debug("Removing zombie capture %s" % cap_name)
435 cls.vapi.cli('packet-generator delete %s' % cap_name)
437 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
438 cls.vapi.cli('packet-generator enable')
439 cls._zombie_captures = cls._captures
443 def create_pg_interfaces(cls, interfaces):
445 Create packet-generator interfaces.
447 :param interfaces: iterable indexes of the interfaces.
448 :returns: List of created interfaces.
453 intf = VppPGInterface(cls, i)
454 setattr(cls, intf.name, intf)
456 cls.pg_interfaces = result
460 def create_loopback_interfaces(cls, interfaces):
462 Create loopback interfaces.
464 :param interfaces: iterable indexes of the interfaces.
465 :returns: List of created interfaces.
469 intf = VppLoInterface(cls, i)
470 setattr(cls, intf.name, intf)
472 cls.lo_interfaces = result
476 def extend_packet(packet, size):
478 Extend packet to given size by padding with spaces
479 NOTE: Currently works only when Raw layer is present.
481 :param packet: packet
482 :param size: target size
485 packet_len = len(packet) + 4
486 extend = size - packet_len
488 packet[Raw].load += ' ' * extend
491 def reset_packet_infos(cls):
492 """ Reset the list of packet info objects and packet counts to zero """
493 cls._packet_infos = {}
494 cls._packet_count_for_dst_if_idx = {}
497 def create_packet_info(cls, src_if, dst_if):
499 Create packet info object containing the source and destination indexes
500 and add it to the testcase's packet info list
502 :param VppInterface src_if: source interface
503 :param VppInterface dst_if: destination interface
505 :returns: _PacketInfo object
509 info.index = len(cls._packet_infos)
510 info.src = src_if.sw_if_index
511 info.dst = dst_if.sw_if_index
512 if isinstance(dst_if, VppSubInterface):
513 dst_idx = dst_if.parent.sw_if_index
515 dst_idx = dst_if.sw_if_index
516 if dst_idx in cls._packet_count_for_dst_if_idx:
517 cls._packet_count_for_dst_if_idx[dst_idx] += 1
519 cls._packet_count_for_dst_if_idx[dst_idx] = 1
520 cls._packet_infos[info.index] = info
524 def info_to_payload(info):
526 Convert _PacketInfo object to packet payload
528 :param info: _PacketInfo object
530 :returns: string containing serialized data from packet info
532 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
536 def payload_to_info(payload):
538 Convert packet payload to _PacketInfo object
540 :param payload: packet payload
542 :returns: _PacketInfo object containing de-serialized data from payload
545 numbers = payload.split()
547 info.index = int(numbers[0])
548 info.src = int(numbers[1])
549 info.dst = int(numbers[2])
550 info.ip = int(numbers[3])
551 info.proto = int(numbers[4])
554 def get_next_packet_info(self, info):
556 Iterate over the packet info list stored in the testcase
557 Start iteration with first element if info is None
558 Continue based on index in info if info is specified
560 :param info: info or None
561 :returns: next info in list or None if no more infos
566 next_index = info.index + 1
567 if next_index == len(self._packet_infos):
570 return self._packet_infos[next_index]
572 def get_next_packet_info_for_interface(self, src_index, info):
574 Search the packet info list for the next packet info with same source
577 :param src_index: source interface index to search for
578 :param info: packet info - where to start the search
579 :returns: packet info or None
583 info = self.get_next_packet_info(info)
586 if info.src == src_index:
589 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
591 Search the packet info list for the next packet info with same source
592 and destination interface indexes
594 :param src_index: source interface index to search for
595 :param dst_index: destination interface index to search for
596 :param info: packet info - where to start the search
597 :returns: packet info or None
601 info = self.get_next_packet_info_for_interface(src_index, info)
604 if info.dst == dst_index:
607 def assert_equal(self, real_value, expected_value, name_or_class=None):
608 if name_or_class is None:
609 self.assertEqual(real_value, expected_value)
612 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
613 msg = msg % (getdoc(name_or_class).strip(),
614 real_value, str(name_or_class(real_value)),
615 expected_value, str(name_or_class(expected_value)))
617 msg = "Invalid %s: %s does not match expected value %s" % (
618 name_or_class, real_value, expected_value)
620 self.assertEqual(real_value, expected_value, msg)
622 def assert_in_range(self,
630 msg = "Invalid %s: %s out of range <%s,%s>" % (
631 name, real_value, expected_min, expected_max)
632 self.assertTrue(expected_min <= real_value <= expected_max, msg)
635 def sleep(cls, timeout, remark=None):
636 if hasattr(cls, 'logger'):
637 cls.logger.debug("Sleeping for %ss (%s)" % (timeout, remark))
641 class TestCasePrinter(object):
645 self.__dict__ = self._shared_state
646 if not hasattr(self, "_test_case_set"):
647 self._test_case_set = set()
649 def print_test_case_heading_if_first_time(self, case):
650 if case.__class__ not in self._test_case_set:
651 print(double_line_delim)
652 print(colorize(getdoc(case.__class__).splitlines()[0], YELLOW))
653 print(double_line_delim)
654 self._test_case_set.add(case.__class__)
657 class VppTestResult(unittest.TestResult):
659 @property result_string
660 String variable to store the test case result string.
662 List variable containing 2-tuples of TestCase instances and strings
663 holding formatted tracebacks. Each tuple represents a test which
664 raised an unexpected exception.
666 List variable containing 2-tuples of TestCase instances and strings
667 holding formatted tracebacks. Each tuple represents a test where
668 a failure was explicitly signalled using the TestCase.assert*()
672 def __init__(self, stream, descriptions, verbosity):
674 :param stream File descriptor to store where to report test results.
675 Set to the standard error stream by default.
676 :param descriptions Boolean variable to store information if to use
677 test case descriptions.
678 :param verbosity Integer variable to store required verbosity level.
680 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
682 self.descriptions = descriptions
683 self.verbosity = verbosity
684 self.result_string = None
685 self.printer = TestCasePrinter()
687 def addSuccess(self, test):
689 Record a test succeeded result
694 if hasattr(test, 'logger'):
695 test.logger.debug("--- addSuccess() %s.%s(%s) called"
696 % (test.__class__.__name__,
697 test._testMethodName,
698 test._testMethodDoc))
699 unittest.TestResult.addSuccess(self, test)
700 self.result_string = colorize("OK", GREEN)
702 def addSkip(self, test, reason):
704 Record a test skipped.
710 if hasattr(test, 'logger'):
711 test.logger.debug("--- addSkip() %s.%s(%s) called, reason is %s"
712 % (test.__class__.__name__,
713 test._testMethodName,
716 unittest.TestResult.addSkip(self, test, reason)
717 self.result_string = colorize("SKIP", YELLOW)
719 def addFailure(self, test, err):
721 Record a test failed result
724 :param err: error message
727 if hasattr(test, 'logger'):
728 test.logger.debug("--- addFailure() %s.%s(%s) called, err is %s"
729 % (test.__class__.__name__,
730 test._testMethodName,
731 test._testMethodDoc, err))
732 test.logger.debug("formatted exception is:\n%s" %
733 "".join(format_exception(*err)))
734 unittest.TestResult.addFailure(self, test, err)
735 if hasattr(test, 'tempdir'):
736 self.result_string = colorize("FAIL", RED) + \
737 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
739 self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
741 def addError(self, test, err):
743 Record a test error result
746 :param err: error message
749 if hasattr(test, 'logger'):
750 test.logger.debug("--- addError() %s.%s(%s) called, err is %s"
751 % (test.__class__.__name__,
752 test._testMethodName,
753 test._testMethodDoc, err))
754 test.logger.debug("formatted exception is:\n%s" %
755 "".join(format_exception(*err)))
756 unittest.TestResult.addError(self, test, err)
757 if hasattr(test, 'tempdir'):
758 self.result_string = colorize("ERROR", RED) + \
759 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
761 self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
763 def getDescription(self, test):
768 :returns: test description
771 # TODO: if none print warning not raise exception
772 short_description = test.shortDescription()
773 if self.descriptions and short_description:
774 return short_description
778 def startTest(self, test):
785 self.printer.print_test_case_heading_if_first_time(test)
786 unittest.TestResult.startTest(self, test)
787 if self.verbosity > 0:
789 "Starting " + self.getDescription(test) + " ...")
790 self.stream.writeln(single_line_delim)
792 def stopTest(self, test):
799 unittest.TestResult.stopTest(self, test)
800 if self.verbosity > 0:
801 self.stream.writeln(single_line_delim)
802 self.stream.writeln("%-73s%s" % (self.getDescription(test),
804 self.stream.writeln(single_line_delim)
806 self.stream.writeln("%-73s%s" % (self.getDescription(test),
809 def printErrors(self):
811 Print errors from running the test case
813 self.stream.writeln()
814 self.printErrorList('ERROR', self.errors)
815 self.printErrorList('FAIL', self.failures)
817 def printErrorList(self, flavour, errors):
819 Print error list to the output stream together with error type
820 and test case description.
822 :param flavour: error type
823 :param errors: iterable errors
826 for test, err in errors:
827 self.stream.writeln(double_line_delim)
828 self.stream.writeln("%s: %s" %
829 (flavour, self.getDescription(test)))
830 self.stream.writeln(single_line_delim)
831 self.stream.writeln("%s" % err)
834 class VppTestRunner(unittest.TextTestRunner):
836 A basic test runner implementation which prints results to standard error.
839 def resultclass(self):
840 """Class maintaining the results of the tests"""
843 def __init__(self, stream=sys.stderr, descriptions=True, verbosity=1,
844 failfast=False, buffer=False, resultclass=None):
845 # ignore stream setting here, use hard-coded stdout to be in sync
846 # with prints from VppTestCase methods ...
847 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
848 verbosity, failfast, buffer,
853 def parse_test_option(self):
855 f = os.getenv(self.test_option)
858 filter_file_name = None
859 filter_class_name = None
860 filter_func_name = None
865 raise Exception("Unrecognized %s option: %s" %
866 (self.test_option, f))
868 if parts[2] not in ('*', ''):
869 filter_func_name = parts[2]
870 if parts[1] not in ('*', ''):
871 filter_class_name = parts[1]
872 if parts[0] not in ('*', ''):
873 if parts[0].startswith('test_'):
874 filter_file_name = parts[0]
876 filter_file_name = 'test_%s' % parts[0]
878 if f.startswith('test_'):
881 filter_file_name = 'test_%s' % f
882 return filter_file_name, filter_class_name, filter_func_name
884 def filter_tests(self, tests, filter_file, filter_class, filter_func):
885 result = unittest.suite.TestSuite()
887 if isinstance(t, unittest.suite.TestSuite):
888 # this is a bunch of tests, recursively filter...
889 x = self.filter_tests(t, filter_file, filter_class,
891 if x.countTestCases() > 0:
893 elif isinstance(t, unittest.TestCase):
894 # this is a single test
895 parts = t.id().split('.')
896 # t.id() for common cases like this:
897 # test_classifier.TestClassifier.test_acl_ip
898 # apply filtering only if it is so
900 if filter_file and filter_file != parts[0]:
902 if filter_class and filter_class != parts[1]:
904 if filter_func and filter_func != parts[2]:
908 # unexpected object, don't touch it
919 gc.disable() # disable garbage collection, we'll do that manually
920 print("Running tests using custom test runner") # debug message
921 filter_file, filter_class, filter_func = self.parse_test_option()
922 print("Active filters: file=%s, class=%s, function=%s" % (
923 filter_file, filter_class, filter_func))
924 filtered = self.filter_tests(test, filter_file, filter_class,
926 print("%s out of %s tests match specified filters" % (
927 filtered.countTestCases(), test.countTestCases()))
928 return super(VppTestRunner, self).run(filtered)