3 from __future__ import print_function
12 from collections import deque
13 from threading import Thread, Event
14 from inspect import getdoc
15 from traceback import format_exception
16 from logging import FileHandler, DEBUG, Formatter
17 from scapy.packet import Raw
18 from hook import StepHook, PollHook
19 from vpp_pg_interface import VppPGInterface
20 from vpp_sub_interface import VppSubInterface
21 from vpp_lo_interface import VppLoInterface
22 from vpp_papi_provider import VppPapiProvider
24 from vpp_object import VppObjectRegistry
25 if os.name == 'posix' and sys.version_info[0] < 3:
26 # using subprocess32 is recommended by python official documentation
27 # @ https://docs.python.org/2/library/subprocess.html
28 import subprocess32 as subprocess
33 Test framework module.
35 The module provides a set of tools for constructing and running tests and
36 representing the results.
40 class _PacketInfo(object):
41 """Private class to create packet info object.
43 Help process information about the next packet.
44 Set variables to default values.
46 #: Store the index of the packet.
48 #: Store the index of the source packet generator interface of the packet.
50 #: Store the index of the destination packet generator interface
53 #: Store expected ip version
55 #: Store expected upper protocol
57 #: Store the copy of the former packet.
60 def __eq__(self, other):
61 index = self.index == other.index
62 src = self.src == other.src
63 dst = self.dst == other.dst
64 data = self.data == other.data
65 return index and src and dst and data
68 def pump_output(testclass):
69 """ pump output from vpp stdout/stderr to proper queues """
70 while not testclass.pump_thread_stop_flag.wait(0):
71 readable = select.select([testclass.vpp.stdout.fileno(),
72 testclass.vpp.stderr.fileno(),
73 testclass.pump_thread_wakeup_pipe[0]],
75 if testclass.vpp.stdout.fileno() in readable:
76 read = os.read(testclass.vpp.stdout.fileno(), 1024)
77 testclass.vpp_stdout_deque.append(read)
78 if testclass.vpp.stderr.fileno() in readable:
79 read = os.read(testclass.vpp.stderr.fileno(), 1024)
80 testclass.vpp_stderr_deque.append(read)
81 # ignoring the dummy pipe here intentionally - the flag will take care
82 # of properly terminating the loop
85 def running_extended_tests():
87 s = os.getenv("EXTENDED_TESTS")
88 return True if s.lower() in ("y", "yes", "1") else False
94 class VppTestCase(unittest.TestCase):
95 """This subclass is a base class for VPP test cases that are implemented as
96 classes. It provides methods to create and run test case.
100 def packet_infos(self):
101 """List of packet infos"""
102 return self._packet_infos
105 def get_packet_count_for_if_idx(cls, dst_if_index):
106 """Get the number of packet info for specified destination if index"""
107 if dst_if_index in cls._packet_count_for_dst_if_idx:
108 return cls._packet_count_for_dst_if_idx[dst_if_index]
114 """Return the instance of this testcase"""
115 return cls.test_instance
118 def set_debug_flags(cls, d):
119 cls.debug_core = False
120 cls.debug_gdb = False
121 cls.debug_gdbserver = False
126 cls.debug_core = True
129 elif dl == "gdbserver":
130 cls.debug_gdbserver = True
132 raise Exception("Unrecognized DEBUG option: '%s'" % d)
135 def setUpConstants(cls):
136 """ Set-up the test case class based on environment variables """
138 s = os.getenv("STEP")
139 cls.step = True if s.lower() in ("y", "yes", "1") else False
143 d = os.getenv("DEBUG")
146 cls.set_debug_flags(d)
147 cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
148 cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
150 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
151 debug_cli = "cli-listen localhost:5002"
154 size = os.getenv("COREDUMP_SIZE")
156 coredump_size = "coredump-size %s" % size
159 if coredump_size is None:
160 coredump_size = "coredump-size unlimited"
161 cls.vpp_cmdline = [cls.vpp_bin, "unix",
162 "{", "nodaemon", debug_cli, coredump_size, "}",
163 "api-trace", "{", "on", "}",
164 "api-segment", "{", "prefix", cls.shm_prefix, "}",
165 "plugins", "{", "plugin", "dpdk_plugin.so", "{",
167 if cls.plugin_path is not None:
168 cls.vpp_cmdline.extend(["plugin_path", cls.plugin_path])
169 cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
172 def wait_for_enter(cls):
173 if cls.debug_gdbserver:
174 print(double_line_delim)
175 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
177 print(double_line_delim)
178 print("Spawned VPP with PID: %d" % cls.vpp.pid)
180 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
182 print(single_line_delim)
183 print("You can debug the VPP using e.g.:")
184 if cls.debug_gdbserver:
185 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
186 print("Now is the time to attach a gdb by running the above "
187 "command, set up breakpoints etc. and then resume VPP from "
188 "within gdb by issuing the 'continue' command")
190 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
191 print("Now is the time to attach a gdb by running the above "
192 "command and set up breakpoints etc.")
193 print(single_line_delim)
194 raw_input("Press ENTER to continue running the testcase...")
198 cmdline = cls.vpp_cmdline
200 if cls.debug_gdbserver:
201 gdbserver = '/usr/bin/gdbserver'
202 if not os.path.isfile(gdbserver) or \
203 not os.access(gdbserver, os.X_OK):
204 raise Exception("gdbserver binary '%s' does not exist or is "
205 "not executable" % gdbserver)
207 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
208 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
211 cls.vpp = subprocess.Popen(cmdline,
212 stdout=subprocess.PIPE,
213 stderr=subprocess.PIPE,
215 except Exception as e:
216 cls.logger.critical("Couldn't start vpp: %s" % e)
224 Perform class setup before running the testcase
225 Remove shared memory files, start vpp and connect the vpp-api
227 gc.collect() # run garbage collection first
228 cls.logger = getLogger(cls.__name__)
229 cls.tempdir = tempfile.mkdtemp(
230 prefix='vpp-unittest-' + cls.__name__ + '-')
231 file_handler = FileHandler("%s/log.txt" % cls.tempdir)
232 file_handler.setFormatter(
233 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
235 file_handler.setLevel(DEBUG)
236 cls.logger.addHandler(file_handler)
237 cls.shm_prefix = cls.tempdir.split("/")[-1]
238 os.chdir(cls.tempdir)
239 cls.logger.info("Temporary dir is %s, shm prefix is %s",
240 cls.tempdir, cls.shm_prefix)
242 cls.reset_packet_infos()
244 cls._zombie_captures = []
247 cls.registry = VppObjectRegistry()
248 # need to catch exceptions here because if we raise, then the cleanup
249 # doesn't get called and we might end with a zombie vpp
252 cls.vpp_stdout_deque = deque()
253 cls.vpp_stderr_deque = deque()
254 cls.pump_thread_stop_flag = Event()
255 cls.pump_thread_wakeup_pipe = os.pipe()
256 cls.pump_thread = Thread(target=pump_output, args=(cls,))
257 cls.pump_thread.daemon = True
258 cls.pump_thread.start()
259 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls)
264 cls.vapi.register_hook(hook)
265 cls.sleep(0.1, "after vpp startup, before initial poll")
270 if cls.debug_gdbserver:
271 print(colorize("You're running VPP inside gdbserver but "
272 "VPP-API connection failed, did you forget "
273 "to 'continue' VPP from within gdb?", RED))
276 t, v, tb = sys.exc_info()
286 Disconnect vpp-api, kill vpp and cleanup shared memory files
288 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
290 if cls.vpp.returncode is None:
291 print(double_line_delim)
292 print("VPP or GDB server is still running")
293 print(single_line_delim)
294 raw_input("When done debugging, press ENTER to kill the "
295 "process and finish running the testcase...")
297 os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
298 cls.pump_thread_stop_flag.set()
299 if hasattr(cls, 'pump_thread'):
300 cls.logger.debug("Waiting for pump thread to stop")
301 cls.pump_thread.join()
302 if hasattr(cls, 'vpp_stderr_reader_thread'):
303 cls.logger.debug("Waiting for stdderr pump to stop")
304 cls.vpp_stderr_reader_thread.join()
306 if hasattr(cls, 'vpp'):
307 if hasattr(cls, 'vapi'):
308 cls.vapi.disconnect()
311 if cls.vpp.returncode is None:
312 cls.logger.debug("Sending TERM to vpp")
314 cls.logger.debug("Waiting for vpp to die")
315 cls.vpp.communicate()
318 if hasattr(cls, 'vpp_stdout_deque'):
319 cls.logger.info(single_line_delim)
320 cls.logger.info('VPP output to stdout while running %s:',
322 cls.logger.info(single_line_delim)
323 f = open(cls.tempdir + '/vpp_stdout.txt', 'w')
324 vpp_output = "".join(cls.vpp_stdout_deque)
326 cls.logger.info('\n%s', vpp_output)
327 cls.logger.info(single_line_delim)
329 if hasattr(cls, 'vpp_stderr_deque'):
330 cls.logger.info(single_line_delim)
331 cls.logger.info('VPP output to stderr while running %s:',
333 cls.logger.info(single_line_delim)
334 f = open(cls.tempdir + '/vpp_stderr.txt', 'w')
335 vpp_output = "".join(cls.vpp_stderr_deque)
337 cls.logger.info('\n%s', vpp_output)
338 cls.logger.info(single_line_delim)
341 def tearDownClass(cls):
342 """ Perform final cleanup after running all tests in this test-case """
346 """ Show various debug prints after each test """
347 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
348 (self.__class__.__name__, self._testMethodName,
349 self._testMethodDoc))
350 if not self.vpp_dead:
351 self.logger.debug(self.vapi.cli("show trace"))
352 self.logger.info(self.vapi.ppcli("show int"))
353 self.logger.info(self.vapi.ppcli("show hardware"))
354 self.logger.info(self.vapi.ppcli("show error"))
355 self.logger.info(self.vapi.ppcli("show run"))
356 self.registry.remove_vpp_config(self.logger)
357 # Save/Dump VPP api trace log
358 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
359 tmp_api_trace = "/tmp/%s" % api_trace
360 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
361 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
362 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
364 os.rename(tmp_api_trace, vpp_api_trace_log)
365 self.logger.info(self.vapi.ppcli("api trace dump %s" %
368 self.registry.unregister_all(self.logger)
371 """ Clear trace before running each test"""
372 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
373 (self.__class__.__name__, self._testMethodName,
374 self._testMethodDoc))
376 raise Exception("VPP is dead when setting up the test")
377 self.sleep(.1, "during setUp")
378 self.vpp_stdout_deque.append(
379 "--- test setUp() for %s.%s(%s) starts here ---\n" %
380 (self.__class__.__name__, self._testMethodName,
381 self._testMethodDoc))
382 self.vpp_stderr_deque.append(
383 "--- test setUp() for %s.%s(%s) starts here ---\n" %
384 (self.__class__.__name__, self._testMethodName,
385 self._testMethodDoc))
386 self.vapi.cli("clear trace")
387 # store the test instance inside the test class - so that objects
388 # holding the class can access instance methods (like assertEqual)
389 type(self).test_instance = self
392 def pg_enable_capture(cls, interfaces):
394 Enable capture on packet-generator interfaces
396 :param interfaces: iterable interface indexes
403 def register_capture(cls, cap_name):
404 """ Register a capture in the testclass """
405 # add to the list of captures with current timestamp
406 cls._captures.append((time.time(), cap_name))
407 # filter out from zombies
408 cls._zombie_captures = [(stamp, name)
409 for (stamp, name) in cls._zombie_captures
414 """ Remove any zombie captures and enable the packet generator """
415 # how long before capture is allowed to be deleted - otherwise vpp
416 # crashes - 100ms seems enough (this shouldn't be needed at all)
419 for stamp, cap_name in cls._zombie_captures:
420 wait = stamp + capture_ttl - now
422 cls.sleep(wait, "before deleting capture %s" % cap_name)
424 cls.logger.debug("Removing zombie capture %s" % cap_name)
425 cls.vapi.cli('packet-generator delete %s' % cap_name)
427 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
428 cls.vapi.cli('packet-generator enable')
429 cls._zombie_captures = cls._captures
433 def create_pg_interfaces(cls, interfaces):
435 Create packet-generator interfaces.
437 :param interfaces: iterable indexes of the interfaces.
438 :returns: List of created interfaces.
443 intf = VppPGInterface(cls, i)
444 setattr(cls, intf.name, intf)
446 cls.pg_interfaces = result
450 def create_loopback_interfaces(cls, interfaces):
452 Create loopback interfaces.
454 :param interfaces: iterable indexes of the interfaces.
455 :returns: List of created interfaces.
459 intf = VppLoInterface(cls, i)
460 setattr(cls, intf.name, intf)
462 cls.lo_interfaces = result
466 def extend_packet(packet, size):
468 Extend packet to given size by padding with spaces
469 NOTE: Currently works only when Raw layer is present.
471 :param packet: packet
472 :param size: target size
475 packet_len = len(packet) + 4
476 extend = size - packet_len
478 packet[Raw].load += ' ' * extend
481 def reset_packet_infos(cls):
482 """ Reset the list of packet info objects and packet counts to zero """
483 cls._packet_infos = {}
484 cls._packet_count_for_dst_if_idx = {}
487 def create_packet_info(cls, src_if, dst_if):
489 Create packet info object containing the source and destination indexes
490 and add it to the testcase's packet info list
492 :param VppInterface src_if: source interface
493 :param VppInterface dst_if: destination interface
495 :returns: _PacketInfo object
499 info.index = len(cls._packet_infos)
500 info.src = src_if.sw_if_index
501 info.dst = dst_if.sw_if_index
502 if isinstance(dst_if, VppSubInterface):
503 dst_idx = dst_if.parent.sw_if_index
505 dst_idx = dst_if.sw_if_index
506 if dst_idx in cls._packet_count_for_dst_if_idx:
507 cls._packet_count_for_dst_if_idx[dst_idx] += 1
509 cls._packet_count_for_dst_if_idx[dst_idx] = 1
510 cls._packet_infos[info.index] = info
514 def info_to_payload(info):
516 Convert _PacketInfo object to packet payload
518 :param info: _PacketInfo object
520 :returns: string containing serialized data from packet info
522 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
526 def payload_to_info(payload):
528 Convert packet payload to _PacketInfo object
530 :param payload: packet payload
532 :returns: _PacketInfo object containing de-serialized data from payload
535 numbers = payload.split()
537 info.index = int(numbers[0])
538 info.src = int(numbers[1])
539 info.dst = int(numbers[2])
540 info.ip = int(numbers[3])
541 info.proto = int(numbers[4])
544 def get_next_packet_info(self, info):
546 Iterate over the packet info list stored in the testcase
547 Start iteration with first element if info is None
548 Continue based on index in info if info is specified
550 :param info: info or None
551 :returns: next info in list or None if no more infos
556 next_index = info.index + 1
557 if next_index == len(self._packet_infos):
560 return self._packet_infos[next_index]
562 def get_next_packet_info_for_interface(self, src_index, info):
564 Search the packet info list for the next packet info with same source
567 :param src_index: source interface index to search for
568 :param info: packet info - where to start the search
569 :returns: packet info or None
573 info = self.get_next_packet_info(info)
576 if info.src == src_index:
579 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
581 Search the packet info list for the next packet info with same source
582 and destination interface indexes
584 :param src_index: source interface index to search for
585 :param dst_index: destination interface index to search for
586 :param info: packet info - where to start the search
587 :returns: packet info or None
591 info = self.get_next_packet_info_for_interface(src_index, info)
594 if info.dst == dst_index:
597 def assert_equal(self, real_value, expected_value, name_or_class=None):
598 if name_or_class is None:
599 self.assertEqual(real_value, expected_value)
602 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
603 msg = msg % (getdoc(name_or_class).strip(),
604 real_value, str(name_or_class(real_value)),
605 expected_value, str(name_or_class(expected_value)))
607 msg = "Invalid %s: %s does not match expected value %s" % (
608 name_or_class, real_value, expected_value)
610 self.assertEqual(real_value, expected_value, msg)
612 def assert_in_range(self,
620 msg = "Invalid %s: %s out of range <%s,%s>" % (
621 name, real_value, expected_min, expected_max)
622 self.assertTrue(expected_min <= real_value <= expected_max, msg)
625 def sleep(cls, timeout, remark=None):
626 if hasattr(cls, 'logger'):
627 cls.logger.debug("Sleeping for %ss (%s)" % (timeout, remark))
631 class TestCasePrinter(object):
635 self.__dict__ = self._shared_state
636 if not hasattr(self, "_test_case_set"):
637 self._test_case_set = set()
639 def print_test_case_heading_if_first_time(self, case):
640 if case.__class__ not in self._test_case_set:
641 print(double_line_delim)
642 print(colorize(getdoc(case.__class__).splitlines()[0], YELLOW))
643 print(double_line_delim)
644 self._test_case_set.add(case.__class__)
647 class VppTestResult(unittest.TestResult):
649 @property result_string
650 String variable to store the test case result string.
652 List variable containing 2-tuples of TestCase instances and strings
653 holding formatted tracebacks. Each tuple represents a test which
654 raised an unexpected exception.
656 List variable containing 2-tuples of TestCase instances and strings
657 holding formatted tracebacks. Each tuple represents a test where
658 a failure was explicitly signalled using the TestCase.assert*()
662 def __init__(self, stream, descriptions, verbosity):
664 :param stream File descriptor to store where to report test results.
665 Set to the standard error stream by default.
666 :param descriptions Boolean variable to store information if to use
667 test case descriptions.
668 :param verbosity Integer variable to store required verbosity level.
670 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
672 self.descriptions = descriptions
673 self.verbosity = verbosity
674 self.result_string = None
675 self.printer = TestCasePrinter()
677 def addSuccess(self, test):
679 Record a test succeeded result
684 if hasattr(test, 'logger'):
685 test.logger.debug("--- addSuccess() %s.%s(%s) called"
686 % (test.__class__.__name__,
687 test._testMethodName,
688 test._testMethodDoc))
689 unittest.TestResult.addSuccess(self, test)
690 self.result_string = colorize("OK", GREEN)
692 def addSkip(self, test, reason):
694 Record a test skipped.
700 if hasattr(test, 'logger'):
701 test.logger.debug("--- addSkip() %s.%s(%s) called, reason is %s"
702 % (test.__class__.__name__,
703 test._testMethodName,
706 unittest.TestResult.addSkip(self, test, reason)
707 self.result_string = colorize("SKIP", YELLOW)
709 def addFailure(self, test, err):
711 Record a test failed result
714 :param err: error message
717 if hasattr(test, 'logger'):
718 test.logger.debug("--- addFailure() %s.%s(%s) called, err is %s"
719 % (test.__class__.__name__,
720 test._testMethodName,
721 test._testMethodDoc, err))
722 test.logger.debug("formatted exception is:\n%s" %
723 "".join(format_exception(*err)))
724 unittest.TestResult.addFailure(self, test, err)
725 if hasattr(test, 'tempdir'):
726 self.result_string = colorize("FAIL", RED) + \
727 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
729 self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
731 def addError(self, test, err):
733 Record a test error result
736 :param err: error message
739 if hasattr(test, 'logger'):
740 test.logger.debug("--- addError() %s.%s(%s) called, err is %s"
741 % (test.__class__.__name__,
742 test._testMethodName,
743 test._testMethodDoc, err))
744 test.logger.debug("formatted exception is:\n%s" %
745 "".join(format_exception(*err)))
746 unittest.TestResult.addError(self, test, err)
747 if hasattr(test, 'tempdir'):
748 self.result_string = colorize("ERROR", RED) + \
749 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
751 self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
753 def getDescription(self, test):
758 :returns: test description
761 # TODO: if none print warning not raise exception
762 short_description = test.shortDescription()
763 if self.descriptions and short_description:
764 return short_description
768 def startTest(self, test):
775 self.printer.print_test_case_heading_if_first_time(test)
776 unittest.TestResult.startTest(self, test)
777 if self.verbosity > 0:
779 "Starting " + self.getDescription(test) + " ...")
780 self.stream.writeln(single_line_delim)
782 def stopTest(self, test):
789 unittest.TestResult.stopTest(self, test)
790 if self.verbosity > 0:
791 self.stream.writeln(single_line_delim)
792 self.stream.writeln("%-73s%s" % (self.getDescription(test),
794 self.stream.writeln(single_line_delim)
796 self.stream.writeln("%-73s%s" % (self.getDescription(test),
799 def printErrors(self):
801 Print errors from running the test case
803 self.stream.writeln()
804 self.printErrorList('ERROR', self.errors)
805 self.printErrorList('FAIL', self.failures)
807 def printErrorList(self, flavour, errors):
809 Print error list to the output stream together with error type
810 and test case description.
812 :param flavour: error type
813 :param errors: iterable errors
816 for test, err in errors:
817 self.stream.writeln(double_line_delim)
818 self.stream.writeln("%s: %s" %
819 (flavour, self.getDescription(test)))
820 self.stream.writeln(single_line_delim)
821 self.stream.writeln("%s" % err)
824 class VppTestRunner(unittest.TextTestRunner):
826 A basic test runner implementation which prints results to standard error.
829 def resultclass(self):
830 """Class maintaining the results of the tests"""
833 def __init__(self, stream=sys.stderr, descriptions=True, verbosity=1,
834 failfast=False, buffer=False, resultclass=None):
835 # ignore stream setting here, use hard-coded stdout to be in sync
836 # with prints from VppTestCase methods ...
837 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
838 verbosity, failfast, buffer,
843 def parse_test_option(self):
845 f = os.getenv(self.test_option)
848 filter_file_name = None
849 filter_class_name = None
850 filter_func_name = None
855 raise Exception("Unrecognized %s option: %s" %
856 (self.test_option, f))
858 if parts[2] not in ('*', ''):
859 filter_func_name = parts[2]
860 if parts[1] not in ('*', ''):
861 filter_class_name = parts[1]
862 if parts[0] not in ('*', ''):
863 if parts[0].startswith('test_'):
864 filter_file_name = parts[0]
866 filter_file_name = 'test_%s' % parts[0]
868 if f.startswith('test_'):
871 filter_file_name = 'test_%s' % f
872 return filter_file_name, filter_class_name, filter_func_name
874 def filter_tests(self, tests, filter_file, filter_class, filter_func):
875 result = unittest.suite.TestSuite()
877 if isinstance(t, unittest.suite.TestSuite):
878 # this is a bunch of tests, recursively filter...
879 x = self.filter_tests(t, filter_file, filter_class,
881 if x.countTestCases() > 0:
883 elif isinstance(t, unittest.TestCase):
884 # this is a single test
885 parts = t.id().split('.')
886 # t.id() for common cases like this:
887 # test_classifier.TestClassifier.test_acl_ip
888 # apply filtering only if it is so
890 if filter_file and filter_file != parts[0]:
892 if filter_class and filter_class != parts[1]:
894 if filter_func and filter_func != parts[2]:
898 # unexpected object, don't touch it
909 gc.disable() # disable garbage collection, we'll do that manually
910 print("Running tests using custom test runner") # debug message
911 filter_file, filter_class, filter_func = self.parse_test_option()
912 print("Active filters: file=%s, class=%s, function=%s" % (
913 filter_file, filter_class, filter_func))
914 filtered = self.filter_tests(test, filter_file, filter_class,
916 print("%s out of %s tests match specified filters" % (
917 filtered.countTestCases(), test.countTestCases()))
918 return super(VppTestRunner, self).run(filtered)