3 from __future__ import print_function
12 from collections import deque
13 from threading import Thread, Event
14 from inspect import getdoc
15 from traceback import format_exception
16 from logging import FileHandler, DEBUG, Formatter
17 from scapy.packet import Raw
18 from hook import StepHook, PollHook
19 from vpp_pg_interface import VppPGInterface
20 from vpp_sub_interface import VppSubInterface
21 from vpp_lo_interface import VppLoInterface
22 from vpp_papi_provider import VppPapiProvider
24 from vpp_object import VppObjectRegistry
25 if os.name == 'posix' and sys.version_info[0] < 3:
26 # using subprocess32 is recommended by python official documentation
27 # @ https://docs.python.org/2/library/subprocess.html
28 import subprocess32 as subprocess
33 Test framework module.
35 The module provides a set of tools for constructing and running tests and
36 representing the results.
40 class _PacketInfo(object):
41 """Private class to create packet info object.
43 Help process information about the next packet.
44 Set variables to default values.
46 #: Store the index of the packet.
48 #: Store the index of the source packet generator interface of the packet.
50 #: Store the index of the destination packet generator interface
53 #: Store expected ip version
55 #: Store expected upper protocol
57 #: Store the copy of the former packet.
60 def __eq__(self, other):
61 index = self.index == other.index
62 src = self.src == other.src
63 dst = self.dst == other.dst
64 data = self.data == other.data
65 return index and src and dst and data
68 def pump_output(testclass):
69 """ pump output from vpp stdout/stderr to proper queues """
70 while not testclass.pump_thread_stop_flag.wait(0):
71 readable = select.select([testclass.vpp.stdout.fileno(),
72 testclass.vpp.stderr.fileno(),
73 testclass.pump_thread_wakeup_pipe[0]],
75 if testclass.vpp.stdout.fileno() in readable:
76 read = os.read(testclass.vpp.stdout.fileno(), 1024)
77 testclass.vpp_stdout_deque.append(read)
78 if testclass.vpp.stderr.fileno() in readable:
79 read = os.read(testclass.vpp.stderr.fileno(), 1024)
80 testclass.vpp_stderr_deque.append(read)
81 # ignoring the dummy pipe here intentionally - the flag will take care
82 # of properly terminating the loop
85 def running_extended_tests():
87 s = os.getenv("EXTENDED_TESTS")
88 return True if s.lower() in ("y", "yes", "1") else False
94 class VppTestCase(unittest.TestCase):
95 """This subclass is a base class for VPP test cases that are implemented as
96 classes. It provides methods to create and run test case.
100 def packet_infos(self):
101 """List of packet infos"""
102 return self._packet_infos
105 def get_packet_count_for_if_idx(cls, dst_if_index):
106 """Get the number of packet info for specified destination if index"""
107 if dst_if_index in cls._packet_count_for_dst_if_idx:
108 return cls._packet_count_for_dst_if_idx[dst_if_index]
114 """Return the instance of this testcase"""
115 return cls.test_instance
118 def set_debug_flags(cls, d):
119 cls.debug_core = False
120 cls.debug_gdb = False
121 cls.debug_gdbserver = False
126 cls.debug_core = True
129 elif dl == "gdbserver":
130 cls.debug_gdbserver = True
132 raise Exception("Unrecognized DEBUG option: '%s'" % d)
135 def setUpConstants(cls):
136 """ Set-up the test case class based on environment variables """
138 s = os.getenv("STEP")
139 cls.step = True if s.lower() in ("y", "yes", "1") else False
143 d = os.getenv("DEBUG")
146 cls.set_debug_flags(d)
147 cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
148 cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
149 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
151 if cls.plugin_path is not None:
152 if cls.extern_plugin_path is not None:
153 plugin_path = "%s:%s" % (
154 cls.plugin_path, cls.extern_plugin_path)
155 elif cls.extern_plugin_path is not None:
156 plugin_path = cls.extern_plugin_path
158 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
159 debug_cli = "cli-listen localhost:5002"
162 size = os.getenv("COREDUMP_SIZE")
164 coredump_size = "coredump-size %s" % size
167 if coredump_size is None:
168 coredump_size = "coredump-size unlimited"
169 cls.vpp_cmdline = [cls.vpp_bin, "unix",
170 "{", "nodaemon", debug_cli, coredump_size, "}",
171 "api-trace", "{", "on", "}",
172 "api-segment", "{", "prefix", cls.shm_prefix, "}",
173 "plugins", "{", "plugin", "dpdk_plugin.so", "{",
175 if plugin_path is not None:
176 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
177 cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
180 def wait_for_enter(cls):
181 if cls.debug_gdbserver:
182 print(double_line_delim)
183 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
185 print(double_line_delim)
186 print("Spawned VPP with PID: %d" % cls.vpp.pid)
188 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
190 print(single_line_delim)
191 print("You can debug the VPP using e.g.:")
192 if cls.debug_gdbserver:
193 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
194 print("Now is the time to attach a gdb by running the above "
195 "command, set up breakpoints etc. and then resume VPP from "
196 "within gdb by issuing the 'continue' command")
198 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
199 print("Now is the time to attach a gdb by running the above "
200 "command and set up breakpoints etc.")
201 print(single_line_delim)
202 raw_input("Press ENTER to continue running the testcase...")
206 cmdline = cls.vpp_cmdline
208 if cls.debug_gdbserver:
209 gdbserver = '/usr/bin/gdbserver'
210 if not os.path.isfile(gdbserver) or \
211 not os.access(gdbserver, os.X_OK):
212 raise Exception("gdbserver binary '%s' does not exist or is "
213 "not executable" % gdbserver)
215 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
216 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
219 cls.vpp = subprocess.Popen(cmdline,
220 stdout=subprocess.PIPE,
221 stderr=subprocess.PIPE,
223 except Exception as e:
224 cls.logger.critical("Couldn't start vpp: %s" % e)
232 Perform class setup before running the testcase
233 Remove shared memory files, start vpp and connect the vpp-api
235 gc.collect() # run garbage collection first
236 cls.logger = getLogger(cls.__name__)
237 cls.tempdir = tempfile.mkdtemp(
238 prefix='vpp-unittest-' + cls.__name__ + '-')
239 file_handler = FileHandler("%s/log.txt" % cls.tempdir)
240 file_handler.setFormatter(
241 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
243 file_handler.setLevel(DEBUG)
244 cls.logger.addHandler(file_handler)
245 cls.shm_prefix = cls.tempdir.split("/")[-1]
246 os.chdir(cls.tempdir)
247 cls.logger.info("Temporary dir is %s, shm prefix is %s",
248 cls.tempdir, cls.shm_prefix)
250 cls.reset_packet_infos()
252 cls._zombie_captures = []
255 cls.registry = VppObjectRegistry()
256 # need to catch exceptions here because if we raise, then the cleanup
257 # doesn't get called and we might end with a zombie vpp
260 cls.vpp_stdout_deque = deque()
261 cls.vpp_stderr_deque = deque()
262 cls.pump_thread_stop_flag = Event()
263 cls.pump_thread_wakeup_pipe = os.pipe()
264 cls.pump_thread = Thread(target=pump_output, args=(cls,))
265 cls.pump_thread.daemon = True
266 cls.pump_thread.start()
267 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls)
272 cls.vapi.register_hook(hook)
273 cls.sleep(0.1, "after vpp startup, before initial poll")
278 if cls.debug_gdbserver:
279 print(colorize("You're running VPP inside gdbserver but "
280 "VPP-API connection failed, did you forget "
281 "to 'continue' VPP from within gdb?", RED))
284 t, v, tb = sys.exc_info()
294 Disconnect vpp-api, kill vpp and cleanup shared memory files
296 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
298 if cls.vpp.returncode is None:
299 print(double_line_delim)
300 print("VPP or GDB server is still running")
301 print(single_line_delim)
302 raw_input("When done debugging, press ENTER to kill the "
303 "process and finish running the testcase...")
305 os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
306 cls.pump_thread_stop_flag.set()
307 if hasattr(cls, 'pump_thread'):
308 cls.logger.debug("Waiting for pump thread to stop")
309 cls.pump_thread.join()
310 if hasattr(cls, 'vpp_stderr_reader_thread'):
311 cls.logger.debug("Waiting for stdderr pump to stop")
312 cls.vpp_stderr_reader_thread.join()
314 if hasattr(cls, 'vpp'):
315 if hasattr(cls, 'vapi'):
316 cls.vapi.disconnect()
319 if cls.vpp.returncode is None:
320 cls.logger.debug("Sending TERM to vpp")
322 cls.logger.debug("Waiting for vpp to die")
323 cls.vpp.communicate()
326 if hasattr(cls, 'vpp_stdout_deque'):
327 cls.logger.info(single_line_delim)
328 cls.logger.info('VPP output to stdout while running %s:',
330 cls.logger.info(single_line_delim)
331 f = open(cls.tempdir + '/vpp_stdout.txt', 'w')
332 vpp_output = "".join(cls.vpp_stdout_deque)
334 cls.logger.info('\n%s', vpp_output)
335 cls.logger.info(single_line_delim)
337 if hasattr(cls, 'vpp_stderr_deque'):
338 cls.logger.info(single_line_delim)
339 cls.logger.info('VPP output to stderr while running %s:',
341 cls.logger.info(single_line_delim)
342 f = open(cls.tempdir + '/vpp_stderr.txt', 'w')
343 vpp_output = "".join(cls.vpp_stderr_deque)
345 cls.logger.info('\n%s', vpp_output)
346 cls.logger.info(single_line_delim)
349 def tearDownClass(cls):
350 """ Perform final cleanup after running all tests in this test-case """
354 """ Show various debug prints after each test """
355 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
356 (self.__class__.__name__, self._testMethodName,
357 self._testMethodDoc))
358 if not self.vpp_dead:
359 self.logger.debug(self.vapi.cli("show trace"))
360 self.logger.info(self.vapi.ppcli("show int"))
361 self.logger.info(self.vapi.ppcli("show hardware"))
362 self.logger.info(self.vapi.ppcli("show error"))
363 self.logger.info(self.vapi.ppcli("show run"))
364 self.registry.remove_vpp_config(self.logger)
365 # Save/Dump VPP api trace log
366 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
367 tmp_api_trace = "/tmp/%s" % api_trace
368 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
369 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
370 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
372 os.rename(tmp_api_trace, vpp_api_trace_log)
373 self.logger.info(self.vapi.ppcli("api trace dump %s" %
376 self.registry.unregister_all(self.logger)
379 """ Clear trace before running each test"""
380 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
381 (self.__class__.__name__, self._testMethodName,
382 self._testMethodDoc))
384 raise Exception("VPP is dead when setting up the test")
385 self.sleep(.1, "during setUp")
386 self.vpp_stdout_deque.append(
387 "--- test setUp() for %s.%s(%s) starts here ---\n" %
388 (self.__class__.__name__, self._testMethodName,
389 self._testMethodDoc))
390 self.vpp_stderr_deque.append(
391 "--- test setUp() for %s.%s(%s) starts here ---\n" %
392 (self.__class__.__name__, self._testMethodName,
393 self._testMethodDoc))
394 self.vapi.cli("clear trace")
395 # store the test instance inside the test class - so that objects
396 # holding the class can access instance methods (like assertEqual)
397 type(self).test_instance = self
400 def pg_enable_capture(cls, interfaces):
402 Enable capture on packet-generator interfaces
404 :param interfaces: iterable interface indexes
411 def register_capture(cls, cap_name):
412 """ Register a capture in the testclass """
413 # add to the list of captures with current timestamp
414 cls._captures.append((time.time(), cap_name))
415 # filter out from zombies
416 cls._zombie_captures = [(stamp, name)
417 for (stamp, name) in cls._zombie_captures
422 """ Remove any zombie captures and enable the packet generator """
423 # how long before capture is allowed to be deleted - otherwise vpp
424 # crashes - 100ms seems enough (this shouldn't be needed at all)
427 for stamp, cap_name in cls._zombie_captures:
428 wait = stamp + capture_ttl - now
430 cls.sleep(wait, "before deleting capture %s" % cap_name)
432 cls.logger.debug("Removing zombie capture %s" % cap_name)
433 cls.vapi.cli('packet-generator delete %s' % cap_name)
435 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
436 cls.vapi.cli('packet-generator enable')
437 cls._zombie_captures = cls._captures
441 def create_pg_interfaces(cls, interfaces):
443 Create packet-generator interfaces.
445 :param interfaces: iterable indexes of the interfaces.
446 :returns: List of created interfaces.
451 intf = VppPGInterface(cls, i)
452 setattr(cls, intf.name, intf)
454 cls.pg_interfaces = result
458 def create_loopback_interfaces(cls, interfaces):
460 Create loopback interfaces.
462 :param interfaces: iterable indexes of the interfaces.
463 :returns: List of created interfaces.
467 intf = VppLoInterface(cls, i)
468 setattr(cls, intf.name, intf)
470 cls.lo_interfaces = result
474 def extend_packet(packet, size):
476 Extend packet to given size by padding with spaces
477 NOTE: Currently works only when Raw layer is present.
479 :param packet: packet
480 :param size: target size
483 packet_len = len(packet) + 4
484 extend = size - packet_len
486 packet[Raw].load += ' ' * extend
489 def reset_packet_infos(cls):
490 """ Reset the list of packet info objects and packet counts to zero """
491 cls._packet_infos = {}
492 cls._packet_count_for_dst_if_idx = {}
495 def create_packet_info(cls, src_if, dst_if):
497 Create packet info object containing the source and destination indexes
498 and add it to the testcase's packet info list
500 :param VppInterface src_if: source interface
501 :param VppInterface dst_if: destination interface
503 :returns: _PacketInfo object
507 info.index = len(cls._packet_infos)
508 info.src = src_if.sw_if_index
509 info.dst = dst_if.sw_if_index
510 if isinstance(dst_if, VppSubInterface):
511 dst_idx = dst_if.parent.sw_if_index
513 dst_idx = dst_if.sw_if_index
514 if dst_idx in cls._packet_count_for_dst_if_idx:
515 cls._packet_count_for_dst_if_idx[dst_idx] += 1
517 cls._packet_count_for_dst_if_idx[dst_idx] = 1
518 cls._packet_infos[info.index] = info
522 def info_to_payload(info):
524 Convert _PacketInfo object to packet payload
526 :param info: _PacketInfo object
528 :returns: string containing serialized data from packet info
530 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
534 def payload_to_info(payload):
536 Convert packet payload to _PacketInfo object
538 :param payload: packet payload
540 :returns: _PacketInfo object containing de-serialized data from payload
543 numbers = payload.split()
545 info.index = int(numbers[0])
546 info.src = int(numbers[1])
547 info.dst = int(numbers[2])
548 info.ip = int(numbers[3])
549 info.proto = int(numbers[4])
552 def get_next_packet_info(self, info):
554 Iterate over the packet info list stored in the testcase
555 Start iteration with first element if info is None
556 Continue based on index in info if info is specified
558 :param info: info or None
559 :returns: next info in list or None if no more infos
564 next_index = info.index + 1
565 if next_index == len(self._packet_infos):
568 return self._packet_infos[next_index]
570 def get_next_packet_info_for_interface(self, src_index, info):
572 Search the packet info list for the next packet info with same source
575 :param src_index: source interface index to search for
576 :param info: packet info - where to start the search
577 :returns: packet info or None
581 info = self.get_next_packet_info(info)
584 if info.src == src_index:
587 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
589 Search the packet info list for the next packet info with same source
590 and destination interface indexes
592 :param src_index: source interface index to search for
593 :param dst_index: destination interface index to search for
594 :param info: packet info - where to start the search
595 :returns: packet info or None
599 info = self.get_next_packet_info_for_interface(src_index, info)
602 if info.dst == dst_index:
605 def assert_equal(self, real_value, expected_value, name_or_class=None):
606 if name_or_class is None:
607 self.assertEqual(real_value, expected_value)
610 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
611 msg = msg % (getdoc(name_or_class).strip(),
612 real_value, str(name_or_class(real_value)),
613 expected_value, str(name_or_class(expected_value)))
615 msg = "Invalid %s: %s does not match expected value %s" % (
616 name_or_class, real_value, expected_value)
618 self.assertEqual(real_value, expected_value, msg)
620 def assert_in_range(self,
628 msg = "Invalid %s: %s out of range <%s,%s>" % (
629 name, real_value, expected_min, expected_max)
630 self.assertTrue(expected_min <= real_value <= expected_max, msg)
633 def sleep(cls, timeout, remark=None):
634 if hasattr(cls, 'logger'):
635 cls.logger.debug("Sleeping for %ss (%s)" % (timeout, remark))
639 class TestCasePrinter(object):
643 self.__dict__ = self._shared_state
644 if not hasattr(self, "_test_case_set"):
645 self._test_case_set = set()
647 def print_test_case_heading_if_first_time(self, case):
648 if case.__class__ not in self._test_case_set:
649 print(double_line_delim)
650 print(colorize(getdoc(case.__class__).splitlines()[0], YELLOW))
651 print(double_line_delim)
652 self._test_case_set.add(case.__class__)
655 class VppTestResult(unittest.TestResult):
657 @property result_string
658 String variable to store the test case result string.
660 List variable containing 2-tuples of TestCase instances and strings
661 holding formatted tracebacks. Each tuple represents a test which
662 raised an unexpected exception.
664 List variable containing 2-tuples of TestCase instances and strings
665 holding formatted tracebacks. Each tuple represents a test where
666 a failure was explicitly signalled using the TestCase.assert*()
670 def __init__(self, stream, descriptions, verbosity):
672 :param stream File descriptor to store where to report test results.
673 Set to the standard error stream by default.
674 :param descriptions Boolean variable to store information if to use
675 test case descriptions.
676 :param verbosity Integer variable to store required verbosity level.
678 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
680 self.descriptions = descriptions
681 self.verbosity = verbosity
682 self.result_string = None
683 self.printer = TestCasePrinter()
685 def addSuccess(self, test):
687 Record a test succeeded result
692 if hasattr(test, 'logger'):
693 test.logger.debug("--- addSuccess() %s.%s(%s) called"
694 % (test.__class__.__name__,
695 test._testMethodName,
696 test._testMethodDoc))
697 unittest.TestResult.addSuccess(self, test)
698 self.result_string = colorize("OK", GREEN)
700 def addSkip(self, test, reason):
702 Record a test skipped.
708 if hasattr(test, 'logger'):
709 test.logger.debug("--- addSkip() %s.%s(%s) called, reason is %s"
710 % (test.__class__.__name__,
711 test._testMethodName,
714 unittest.TestResult.addSkip(self, test, reason)
715 self.result_string = colorize("SKIP", YELLOW)
717 def addFailure(self, test, err):
719 Record a test failed result
722 :param err: error message
725 if hasattr(test, 'logger'):
726 test.logger.debug("--- addFailure() %s.%s(%s) called, err is %s"
727 % (test.__class__.__name__,
728 test._testMethodName,
729 test._testMethodDoc, err))
730 test.logger.debug("formatted exception is:\n%s" %
731 "".join(format_exception(*err)))
732 unittest.TestResult.addFailure(self, test, err)
733 if hasattr(test, 'tempdir'):
734 self.result_string = colorize("FAIL", RED) + \
735 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
737 self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
739 def addError(self, test, err):
741 Record a test error result
744 :param err: error message
747 if hasattr(test, 'logger'):
748 test.logger.debug("--- addError() %s.%s(%s) called, err is %s"
749 % (test.__class__.__name__,
750 test._testMethodName,
751 test._testMethodDoc, err))
752 test.logger.debug("formatted exception is:\n%s" %
753 "".join(format_exception(*err)))
754 unittest.TestResult.addError(self, test, err)
755 if hasattr(test, 'tempdir'):
756 self.result_string = colorize("ERROR", RED) + \
757 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
759 self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
761 def getDescription(self, test):
766 :returns: test description
769 # TODO: if none print warning not raise exception
770 short_description = test.shortDescription()
771 if self.descriptions and short_description:
772 return short_description
776 def startTest(self, test):
783 self.printer.print_test_case_heading_if_first_time(test)
784 unittest.TestResult.startTest(self, test)
785 if self.verbosity > 0:
787 "Starting " + self.getDescription(test) + " ...")
788 self.stream.writeln(single_line_delim)
790 def stopTest(self, test):
797 unittest.TestResult.stopTest(self, test)
798 if self.verbosity > 0:
799 self.stream.writeln(single_line_delim)
800 self.stream.writeln("%-73s%s" % (self.getDescription(test),
802 self.stream.writeln(single_line_delim)
804 self.stream.writeln("%-73s%s" % (self.getDescription(test),
807 def printErrors(self):
809 Print errors from running the test case
811 self.stream.writeln()
812 self.printErrorList('ERROR', self.errors)
813 self.printErrorList('FAIL', self.failures)
815 def printErrorList(self, flavour, errors):
817 Print error list to the output stream together with error type
818 and test case description.
820 :param flavour: error type
821 :param errors: iterable errors
824 for test, err in errors:
825 self.stream.writeln(double_line_delim)
826 self.stream.writeln("%s: %s" %
827 (flavour, self.getDescription(test)))
828 self.stream.writeln(single_line_delim)
829 self.stream.writeln("%s" % err)
832 class VppTestRunner(unittest.TextTestRunner):
834 A basic test runner implementation which prints results to standard error.
837 def resultclass(self):
838 """Class maintaining the results of the tests"""
841 def __init__(self, stream=sys.stderr, descriptions=True, verbosity=1,
842 failfast=False, buffer=False, resultclass=None):
843 # ignore stream setting here, use hard-coded stdout to be in sync
844 # with prints from VppTestCase methods ...
845 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
846 verbosity, failfast, buffer,
851 def parse_test_option(self):
853 f = os.getenv(self.test_option)
856 filter_file_name = None
857 filter_class_name = None
858 filter_func_name = None
863 raise Exception("Unrecognized %s option: %s" %
864 (self.test_option, f))
866 if parts[2] not in ('*', ''):
867 filter_func_name = parts[2]
868 if parts[1] not in ('*', ''):
869 filter_class_name = parts[1]
870 if parts[0] not in ('*', ''):
871 if parts[0].startswith('test_'):
872 filter_file_name = parts[0]
874 filter_file_name = 'test_%s' % parts[0]
876 if f.startswith('test_'):
879 filter_file_name = 'test_%s' % f
880 return filter_file_name, filter_class_name, filter_func_name
882 def filter_tests(self, tests, filter_file, filter_class, filter_func):
883 result = unittest.suite.TestSuite()
885 if isinstance(t, unittest.suite.TestSuite):
886 # this is a bunch of tests, recursively filter...
887 x = self.filter_tests(t, filter_file, filter_class,
889 if x.countTestCases() > 0:
891 elif isinstance(t, unittest.TestCase):
892 # this is a single test
893 parts = t.id().split('.')
894 # t.id() for common cases like this:
895 # test_classifier.TestClassifier.test_acl_ip
896 # apply filtering only if it is so
898 if filter_file and filter_file != parts[0]:
900 if filter_class and filter_class != parts[1]:
902 if filter_func and filter_func != parts[2]:
906 # unexpected object, don't touch it
917 gc.disable() # disable garbage collection, we'll do that manually
918 print("Running tests using custom test runner") # debug message
919 filter_file, filter_class, filter_func = self.parse_test_option()
920 print("Active filters: file=%s, class=%s, function=%s" % (
921 filter_file, filter_class, filter_func))
922 filtered = self.filter_tests(test, filter_file, filter_class,
924 print("%s out of %s tests match specified filters" % (
925 filtered.countTestCases(), test.countTestCases()))
926 return super(VppTestRunner, self).run(filtered)