3 from __future__ import print_function
12 from collections import deque
13 from threading import Thread, Event
14 from inspect import getdoc
15 from traceback import format_exception
16 from logging import FileHandler, DEBUG, Formatter
17 from scapy.packet import Raw
18 from hook import StepHook, PollHook
19 from vpp_pg_interface import VppPGInterface
20 from vpp_sub_interface import VppSubInterface
21 from vpp_lo_interface import VppLoInterface
22 from vpp_papi_provider import VppPapiProvider
24 from vpp_object import VppObjectRegistry
25 if os.name == 'posix' and sys.version_info[0] < 3:
26 # using subprocess32 is recommended by python official documentation
27 # @ https://docs.python.org/2/library/subprocess.html
28 import subprocess32 as subprocess
33 Test framework module.
35 The module provides a set of tools for constructing and running tests and
36 representing the results.
40 class _PacketInfo(object):
41 """Private class to create packet info object.
43 Help process information about the next packet.
44 Set variables to default values.
46 #: Store the index of the packet.
48 #: Store the index of the source packet generator interface of the packet.
50 #: Store the index of the destination packet generator interface
53 #: Store expected ip version
55 #: Store expected upper protocol
57 #: Store the copy of the former packet.
60 def __eq__(self, other):
61 index = self.index == other.index
62 src = self.src == other.src
63 dst = self.dst == other.dst
64 data = self.data == other.data
65 return index and src and dst and data
68 def pump_output(testclass):
69 """ pump output from vpp stdout/stderr to proper queues """
70 while not testclass.pump_thread_stop_flag.wait(0):
71 readable = select.select([testclass.vpp.stdout.fileno(),
72 testclass.vpp.stderr.fileno(),
73 testclass.pump_thread_wakeup_pipe[0]],
75 if testclass.vpp.stdout.fileno() in readable:
76 read = os.read(testclass.vpp.stdout.fileno(), 1024)
77 testclass.vpp_stdout_deque.append(read)
78 if testclass.vpp.stderr.fileno() in readable:
79 read = os.read(testclass.vpp.stderr.fileno(), 1024)
80 testclass.vpp_stderr_deque.append(read)
81 # ignoring the dummy pipe here intentionally - the flag will take care
82 # of properly terminating the loop
85 def running_extended_tests():
87 s = os.getenv("EXTENDED_TESTS")
88 return True if s.lower() in ("y", "yes", "1") else False
94 class VppTestCase(unittest.TestCase):
95 """This subclass is a base class for VPP test cases that are implemented as
96 classes. It provides methods to create and run test case.
100 def packet_infos(self):
101 """List of packet infos"""
102 return self._packet_infos
105 def get_packet_count_for_if_idx(cls, dst_if_index):
106 """Get the number of packet info for specified destination if index"""
107 if dst_if_index in cls._packet_count_for_dst_if_idx:
108 return cls._packet_count_for_dst_if_idx[dst_if_index]
114 """Return the instance of this testcase"""
115 return cls.test_instance
118 def set_debug_flags(cls, d):
119 cls.debug_core = False
120 cls.debug_gdb = False
121 cls.debug_gdbserver = False
126 cls.debug_core = True
129 elif dl == "gdbserver":
130 cls.debug_gdbserver = True
132 raise Exception("Unrecognized DEBUG option: '%s'" % d)
135 def setUpConstants(cls):
136 """ Set-up the test case class based on environment variables """
138 s = os.getenv("STEP")
139 cls.step = True if s.lower() in ("y", "yes", "1") else False
143 d = os.getenv("DEBUG")
146 cls.set_debug_flags(d)
147 cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
148 cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
149 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
151 if cls.plugin_path is not None:
152 if cls.extern_plugin_path is not None:
153 plugin_path = "%s:%s" % (
154 cls.plugin_path, cls.extern_plugin_path)
156 plugin_path = cls.plugin_path
157 elif cls.extern_plugin_path is not None:
158 plugin_path = cls.extern_plugin_path
160 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
161 debug_cli = "cli-listen localhost:5002"
164 size = os.getenv("COREDUMP_SIZE")
166 coredump_size = "coredump-size %s" % size
169 if coredump_size is None:
170 coredump_size = "coredump-size unlimited"
171 cls.vpp_cmdline = [cls.vpp_bin, "unix",
172 "{", "nodaemon", debug_cli, coredump_size, "}",
173 "api-trace", "{", "on", "}",
174 "api-segment", "{", "prefix", cls.shm_prefix, "}",
175 "plugins", "{", "plugin", "dpdk_plugin.so", "{",
177 if plugin_path is not None:
178 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
179 cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
182 def wait_for_enter(cls):
183 if cls.debug_gdbserver:
184 print(double_line_delim)
185 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
187 print(double_line_delim)
188 print("Spawned VPP with PID: %d" % cls.vpp.pid)
190 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
192 print(single_line_delim)
193 print("You can debug the VPP using e.g.:")
194 if cls.debug_gdbserver:
195 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
196 print("Now is the time to attach a gdb by running the above "
197 "command, set up breakpoints etc. and then resume VPP from "
198 "within gdb by issuing the 'continue' command")
200 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
201 print("Now is the time to attach a gdb by running the above "
202 "command and set up breakpoints etc.")
203 print(single_line_delim)
204 raw_input("Press ENTER to continue running the testcase...")
208 cmdline = cls.vpp_cmdline
210 if cls.debug_gdbserver:
211 gdbserver = '/usr/bin/gdbserver'
212 if not os.path.isfile(gdbserver) or \
213 not os.access(gdbserver, os.X_OK):
214 raise Exception("gdbserver binary '%s' does not exist or is "
215 "not executable" % gdbserver)
217 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
218 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
221 cls.vpp = subprocess.Popen(cmdline,
222 stdout=subprocess.PIPE,
223 stderr=subprocess.PIPE,
225 except Exception as e:
226 cls.logger.critical("Couldn't start vpp: %s" % e)
234 Perform class setup before running the testcase
235 Remove shared memory files, start vpp and connect the vpp-api
237 gc.collect() # run garbage collection first
238 cls.logger = getLogger(cls.__name__)
239 cls.tempdir = tempfile.mkdtemp(
240 prefix='vpp-unittest-' + cls.__name__ + '-')
241 file_handler = FileHandler("%s/log.txt" % cls.tempdir)
242 file_handler.setFormatter(
243 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
245 file_handler.setLevel(DEBUG)
246 cls.logger.addHandler(file_handler)
247 cls.shm_prefix = cls.tempdir.split("/")[-1]
248 os.chdir(cls.tempdir)
249 cls.logger.info("Temporary dir is %s, shm prefix is %s",
250 cls.tempdir, cls.shm_prefix)
252 cls.reset_packet_infos()
254 cls._zombie_captures = []
257 cls.registry = VppObjectRegistry()
258 cls.vpp_startup_failed = False
259 # need to catch exceptions here because if we raise, then the cleanup
260 # doesn't get called and we might end with a zombie vpp
263 cls.vpp_stdout_deque = deque()
264 cls.vpp_stderr_deque = deque()
265 cls.pump_thread_stop_flag = Event()
266 cls.pump_thread_wakeup_pipe = os.pipe()
267 cls.pump_thread = Thread(target=pump_output, args=(cls,))
268 cls.pump_thread.daemon = True
269 cls.pump_thread.start()
270 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls)
275 cls.vapi.register_hook(hook)
276 cls.sleep(0.1, "after vpp startup, before initial poll")
280 cls.vpp_startup_failed = True
282 "VPP died shortly after startup, check the"
283 " output to standard error for possible cause")
288 if cls.debug_gdbserver:
289 print(colorize("You're running VPP inside gdbserver but "
290 "VPP-API connection failed, did you forget "
291 "to 'continue' VPP from within gdb?", RED))
294 t, v, tb = sys.exc_info()
304 Disconnect vpp-api, kill vpp and cleanup shared memory files
306 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
308 if cls.vpp.returncode is None:
309 print(double_line_delim)
310 print("VPP or GDB server is still running")
311 print(single_line_delim)
312 raw_input("When done debugging, press ENTER to kill the "
313 "process and finish running the testcase...")
315 os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
316 cls.pump_thread_stop_flag.set()
317 if hasattr(cls, 'pump_thread'):
318 cls.logger.debug("Waiting for pump thread to stop")
319 cls.pump_thread.join()
320 if hasattr(cls, 'vpp_stderr_reader_thread'):
321 cls.logger.debug("Waiting for stdderr pump to stop")
322 cls.vpp_stderr_reader_thread.join()
324 if hasattr(cls, 'vpp'):
325 if hasattr(cls, 'vapi'):
326 cls.vapi.disconnect()
329 if cls.vpp.returncode is None:
330 cls.logger.debug("Sending TERM to vpp")
332 cls.logger.debug("Waiting for vpp to die")
333 cls.vpp.communicate()
336 if cls.vpp_startup_failed:
337 stdout_log = cls.logger.info
338 stderr_log = cls.logger.critical
340 stdout_log = cls.logger.info
341 stderr_log = cls.logger.info
343 if hasattr(cls, 'vpp_stdout_deque'):
344 stdout_log(single_line_delim)
345 stdout_log('VPP output to stdout while running %s:', cls.__name__)
346 stdout_log(single_line_delim)
347 f = open(cls.tempdir + '/vpp_stdout.txt', 'w')
348 vpp_output = "".join(cls.vpp_stdout_deque)
350 stdout_log('\n%s', vpp_output)
351 stdout_log(single_line_delim)
353 if hasattr(cls, 'vpp_stderr_deque'):
354 stderr_log(single_line_delim)
355 stderr_log('VPP output to stderr while running %s:', cls.__name__)
356 stderr_log(single_line_delim)
357 f = open(cls.tempdir + '/vpp_stderr.txt', 'w')
358 vpp_output = "".join(cls.vpp_stderr_deque)
360 stderr_log('\n%s', vpp_output)
361 stderr_log(single_line_delim)
364 def tearDownClass(cls):
365 """ Perform final cleanup after running all tests in this test-case """
369 """ Show various debug prints after each test """
370 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
371 (self.__class__.__name__, self._testMethodName,
372 self._testMethodDoc))
373 if not self.vpp_dead:
374 self.logger.debug(self.vapi.cli("show trace"))
375 self.logger.info(self.vapi.ppcli("show interface"))
376 self.logger.info(self.vapi.ppcli("show hardware"))
377 self.logger.info(self.vapi.ppcli("show error"))
378 self.logger.info(self.vapi.ppcli("show run"))
379 self.registry.remove_vpp_config(self.logger)
380 # Save/Dump VPP api trace log
381 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
382 tmp_api_trace = "/tmp/%s" % api_trace
383 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
384 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
385 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
387 os.rename(tmp_api_trace, vpp_api_trace_log)
388 self.logger.info(self.vapi.ppcli("api trace dump %s" %
391 self.registry.unregister_all(self.logger)
394 """ Clear trace before running each test"""
395 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
396 (self.__class__.__name__, self._testMethodName,
397 self._testMethodDoc))
399 raise Exception("VPP is dead when setting up the test")
400 self.sleep(.1, "during setUp")
401 self.vpp_stdout_deque.append(
402 "--- test setUp() for %s.%s(%s) starts here ---\n" %
403 (self.__class__.__name__, self._testMethodName,
404 self._testMethodDoc))
405 self.vpp_stderr_deque.append(
406 "--- test setUp() for %s.%s(%s) starts here ---\n" %
407 (self.__class__.__name__, self._testMethodName,
408 self._testMethodDoc))
409 self.vapi.cli("clear trace")
410 # store the test instance inside the test class - so that objects
411 # holding the class can access instance methods (like assertEqual)
412 type(self).test_instance = self
415 def pg_enable_capture(cls, interfaces):
417 Enable capture on packet-generator interfaces
419 :param interfaces: iterable interface indexes
426 def register_capture(cls, cap_name):
427 """ Register a capture in the testclass """
428 # add to the list of captures with current timestamp
429 cls._captures.append((time.time(), cap_name))
430 # filter out from zombies
431 cls._zombie_captures = [(stamp, name)
432 for (stamp, name) in cls._zombie_captures
437 """ Remove any zombie captures and enable the packet generator """
438 # how long before capture is allowed to be deleted - otherwise vpp
439 # crashes - 100ms seems enough (this shouldn't be needed at all)
442 for stamp, cap_name in cls._zombie_captures:
443 wait = stamp + capture_ttl - now
445 cls.sleep(wait, "before deleting capture %s" % cap_name)
447 cls.logger.debug("Removing zombie capture %s" % cap_name)
448 cls.vapi.cli('packet-generator delete %s' % cap_name)
450 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
451 cls.vapi.cli('packet-generator enable')
452 cls._zombie_captures = cls._captures
456 def create_pg_interfaces(cls, interfaces):
458 Create packet-generator interfaces.
460 :param interfaces: iterable indexes of the interfaces.
461 :returns: List of created interfaces.
466 intf = VppPGInterface(cls, i)
467 setattr(cls, intf.name, intf)
469 cls.pg_interfaces = result
473 def create_loopback_interfaces(cls, interfaces):
475 Create loopback interfaces.
477 :param interfaces: iterable indexes of the interfaces.
478 :returns: List of created interfaces.
482 intf = VppLoInterface(cls, i)
483 setattr(cls, intf.name, intf)
485 cls.lo_interfaces = result
489 def extend_packet(packet, size):
491 Extend packet to given size by padding with spaces
492 NOTE: Currently works only when Raw layer is present.
494 :param packet: packet
495 :param size: target size
498 packet_len = len(packet) + 4
499 extend = size - packet_len
501 packet[Raw].load += ' ' * extend
504 def reset_packet_infos(cls):
505 """ Reset the list of packet info objects and packet counts to zero """
506 cls._packet_infos = {}
507 cls._packet_count_for_dst_if_idx = {}
510 def create_packet_info(cls, src_if, dst_if):
512 Create packet info object containing the source and destination indexes
513 and add it to the testcase's packet info list
515 :param VppInterface src_if: source interface
516 :param VppInterface dst_if: destination interface
518 :returns: _PacketInfo object
522 info.index = len(cls._packet_infos)
523 info.src = src_if.sw_if_index
524 info.dst = dst_if.sw_if_index
525 if isinstance(dst_if, VppSubInterface):
526 dst_idx = dst_if.parent.sw_if_index
528 dst_idx = dst_if.sw_if_index
529 if dst_idx in cls._packet_count_for_dst_if_idx:
530 cls._packet_count_for_dst_if_idx[dst_idx] += 1
532 cls._packet_count_for_dst_if_idx[dst_idx] = 1
533 cls._packet_infos[info.index] = info
537 def info_to_payload(info):
539 Convert _PacketInfo object to packet payload
541 :param info: _PacketInfo object
543 :returns: string containing serialized data from packet info
545 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
549 def payload_to_info(payload):
551 Convert packet payload to _PacketInfo object
553 :param payload: packet payload
555 :returns: _PacketInfo object containing de-serialized data from payload
558 numbers = payload.split()
560 info.index = int(numbers[0])
561 info.src = int(numbers[1])
562 info.dst = int(numbers[2])
563 info.ip = int(numbers[3])
564 info.proto = int(numbers[4])
567 def get_next_packet_info(self, info):
569 Iterate over the packet info list stored in the testcase
570 Start iteration with first element if info is None
571 Continue based on index in info if info is specified
573 :param info: info or None
574 :returns: next info in list or None if no more infos
579 next_index = info.index + 1
580 if next_index == len(self._packet_infos):
583 return self._packet_infos[next_index]
585 def get_next_packet_info_for_interface(self, src_index, info):
587 Search the packet info list for the next packet info with same source
590 :param src_index: source interface index to search for
591 :param info: packet info - where to start the search
592 :returns: packet info or None
596 info = self.get_next_packet_info(info)
599 if info.src == src_index:
602 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
604 Search the packet info list for the next packet info with same source
605 and destination interface indexes
607 :param src_index: source interface index to search for
608 :param dst_index: destination interface index to search for
609 :param info: packet info - where to start the search
610 :returns: packet info or None
614 info = self.get_next_packet_info_for_interface(src_index, info)
617 if info.dst == dst_index:
620 def assert_equal(self, real_value, expected_value, name_or_class=None):
621 if name_or_class is None:
622 self.assertEqual(real_value, expected_value)
625 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
626 msg = msg % (getdoc(name_or_class).strip(),
627 real_value, str(name_or_class(real_value)),
628 expected_value, str(name_or_class(expected_value)))
630 msg = "Invalid %s: %s does not match expected value %s" % (
631 name_or_class, real_value, expected_value)
633 self.assertEqual(real_value, expected_value, msg)
635 def assert_in_range(self,
643 msg = "Invalid %s: %s out of range <%s,%s>" % (
644 name, real_value, expected_min, expected_max)
645 self.assertTrue(expected_min <= real_value <= expected_max, msg)
648 def sleep(cls, timeout, remark=None):
649 if hasattr(cls, 'logger'):
650 cls.logger.debug("Sleeping for %ss (%s)" % (timeout, remark))
654 class TestCasePrinter(object):
658 self.__dict__ = self._shared_state
659 if not hasattr(self, "_test_case_set"):
660 self._test_case_set = set()
662 def print_test_case_heading_if_first_time(self, case):
663 if case.__class__ not in self._test_case_set:
664 print(double_line_delim)
665 print(colorize(getdoc(case.__class__).splitlines()[0], YELLOW))
666 print(double_line_delim)
667 self._test_case_set.add(case.__class__)
670 class VppTestResult(unittest.TestResult):
672 @property result_string
673 String variable to store the test case result string.
675 List variable containing 2-tuples of TestCase instances and strings
676 holding formatted tracebacks. Each tuple represents a test which
677 raised an unexpected exception.
679 List variable containing 2-tuples of TestCase instances and strings
680 holding formatted tracebacks. Each tuple represents a test where
681 a failure was explicitly signalled using the TestCase.assert*()
685 def __init__(self, stream, descriptions, verbosity):
687 :param stream File descriptor to store where to report test results.
688 Set to the standard error stream by default.
689 :param descriptions Boolean variable to store information if to use
690 test case descriptions.
691 :param verbosity Integer variable to store required verbosity level.
693 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
695 self.descriptions = descriptions
696 self.verbosity = verbosity
697 self.result_string = None
698 self.printer = TestCasePrinter()
700 def addSuccess(self, test):
702 Record a test succeeded result
707 if hasattr(test, 'logger'):
708 test.logger.debug("--- addSuccess() %s.%s(%s) called"
709 % (test.__class__.__name__,
710 test._testMethodName,
711 test._testMethodDoc))
712 unittest.TestResult.addSuccess(self, test)
713 self.result_string = colorize("OK", GREEN)
715 def addSkip(self, test, reason):
717 Record a test skipped.
723 if hasattr(test, 'logger'):
724 test.logger.debug("--- addSkip() %s.%s(%s) called, reason is %s"
725 % (test.__class__.__name__,
726 test._testMethodName,
729 unittest.TestResult.addSkip(self, test, reason)
730 self.result_string = colorize("SKIP", YELLOW)
732 def addFailure(self, test, err):
734 Record a test failed result
737 :param err: error message
740 if hasattr(test, 'logger'):
741 test.logger.debug("--- addFailure() %s.%s(%s) called, err is %s"
742 % (test.__class__.__name__,
743 test._testMethodName,
744 test._testMethodDoc, err))
745 test.logger.debug("formatted exception is:\n%s" %
746 "".join(format_exception(*err)))
747 unittest.TestResult.addFailure(self, test, err)
748 if hasattr(test, 'tempdir'):
749 self.result_string = colorize("FAIL", RED) + \
750 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
752 self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
754 def addError(self, test, err):
756 Record a test error result
759 :param err: error message
762 if hasattr(test, 'logger'):
763 test.logger.debug("--- addError() %s.%s(%s) called, err is %s"
764 % (test.__class__.__name__,
765 test._testMethodName,
766 test._testMethodDoc, err))
767 test.logger.debug("formatted exception is:\n%s" %
768 "".join(format_exception(*err)))
769 unittest.TestResult.addError(self, test, err)
770 if hasattr(test, 'tempdir'):
771 self.result_string = colorize("ERROR", RED) + \
772 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
774 self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
776 def getDescription(self, test):
781 :returns: test description
784 # TODO: if none print warning not raise exception
785 short_description = test.shortDescription()
786 if self.descriptions and short_description:
787 return short_description
791 def startTest(self, test):
798 self.printer.print_test_case_heading_if_first_time(test)
799 unittest.TestResult.startTest(self, test)
800 if self.verbosity > 0:
802 "Starting " + self.getDescription(test) + " ...")
803 self.stream.writeln(single_line_delim)
805 def stopTest(self, test):
812 unittest.TestResult.stopTest(self, test)
813 if self.verbosity > 0:
814 self.stream.writeln(single_line_delim)
815 self.stream.writeln("%-73s%s" % (self.getDescription(test),
817 self.stream.writeln(single_line_delim)
819 self.stream.writeln("%-73s%s" % (self.getDescription(test),
822 def printErrors(self):
824 Print errors from running the test case
826 self.stream.writeln()
827 self.printErrorList('ERROR', self.errors)
828 self.printErrorList('FAIL', self.failures)
830 def printErrorList(self, flavour, errors):
832 Print error list to the output stream together with error type
833 and test case description.
835 :param flavour: error type
836 :param errors: iterable errors
839 for test, err in errors:
840 self.stream.writeln(double_line_delim)
841 self.stream.writeln("%s: %s" %
842 (flavour, self.getDescription(test)))
843 self.stream.writeln(single_line_delim)
844 self.stream.writeln("%s" % err)
847 class VppTestRunner(unittest.TextTestRunner):
849 A basic test runner implementation which prints results to standard error.
852 def resultclass(self):
853 """Class maintaining the results of the tests"""
856 def __init__(self, stream=sys.stderr, descriptions=True, verbosity=1,
857 failfast=False, buffer=False, resultclass=None):
858 # ignore stream setting here, use hard-coded stdout to be in sync
859 # with prints from VppTestCase methods ...
860 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
861 verbosity, failfast, buffer,
866 def parse_test_option(self):
868 f = os.getenv(self.test_option)
871 filter_file_name = None
872 filter_class_name = None
873 filter_func_name = None
878 raise Exception("Unrecognized %s option: %s" %
879 (self.test_option, f))
881 if parts[2] not in ('*', ''):
882 filter_func_name = parts[2]
883 if parts[1] not in ('*', ''):
884 filter_class_name = parts[1]
885 if parts[0] not in ('*', ''):
886 if parts[0].startswith('test_'):
887 filter_file_name = parts[0]
889 filter_file_name = 'test_%s' % parts[0]
891 if f.startswith('test_'):
894 filter_file_name = 'test_%s' % f
895 return filter_file_name, filter_class_name, filter_func_name
897 def filter_tests(self, tests, filter_file, filter_class, filter_func):
898 result = unittest.suite.TestSuite()
900 if isinstance(t, unittest.suite.TestSuite):
901 # this is a bunch of tests, recursively filter...
902 x = self.filter_tests(t, filter_file, filter_class,
904 if x.countTestCases() > 0:
906 elif isinstance(t, unittest.TestCase):
907 # this is a single test
908 parts = t.id().split('.')
909 # t.id() for common cases like this:
910 # test_classifier.TestClassifier.test_acl_ip
911 # apply filtering only if it is so
913 if filter_file and filter_file != parts[0]:
915 if filter_class and filter_class != parts[1]:
917 if filter_func and filter_func != parts[2]:
921 # unexpected object, don't touch it
932 gc.disable() # disable garbage collection, we'll do that manually
933 print("Running tests using custom test runner") # debug message
934 filter_file, filter_class, filter_func = self.parse_test_option()
935 print("Active filters: file=%s, class=%s, function=%s" % (
936 filter_file, filter_class, filter_func))
937 filtered = self.filter_tests(test, filter_file, filter_class,
939 print("%s out of %s tests match specified filters" % (
940 filtered.countTestCases(), test.countTestCases()))
941 if not running_extended_tests():
942 print("Not running extended tests (some tests will be skipped)")
943 return super(VppTestRunner, self).run(filtered)