3 from __future__ import print_function
12 from collections import deque
13 from threading import Thread, Event
14 from inspect import getdoc
15 from traceback import format_exception
16 from logging import FileHandler, DEBUG, Formatter
17 from scapy.packet import Raw
18 from hook import StepHook, PollHook
19 from vpp_pg_interface import VppPGInterface
20 from vpp_sub_interface import VppSubInterface
21 from vpp_lo_interface import VppLoInterface
22 from vpp_papi_provider import VppPapiProvider
24 from vpp_object import VppObjectRegistry
25 if os.name == 'posix' and sys.version_info[0] < 3:
26 # using subprocess32 is recommended by python official documentation
27 # @ https://docs.python.org/2/library/subprocess.html
28 import subprocess32 as subprocess
33 Test framework module.
35 The module provides a set of tools for constructing and running tests and
36 representing the results.
40 class _PacketInfo(object):
41 """Private class to create packet info object.
43 Help process information about the next packet.
44 Set variables to default values.
46 #: Store the index of the packet.
48 #: Store the index of the source packet generator interface of the packet.
50 #: Store the index of the destination packet generator interface
53 #: Store expected ip version
55 #: Store expected upper protocol
57 #: Store the copy of the former packet.
60 def __eq__(self, other):
61 index = self.index == other.index
62 src = self.src == other.src
63 dst = self.dst == other.dst
64 data = self.data == other.data
65 return index and src and dst and data
68 def pump_output(testclass):
69 """ pump output from vpp stdout/stderr to proper queues """
70 while not testclass.pump_thread_stop_flag.wait(0):
71 readable = select.select([testclass.vpp.stdout.fileno(),
72 testclass.vpp.stderr.fileno(),
73 testclass.pump_thread_wakeup_pipe[0]],
75 if testclass.vpp.stdout.fileno() in readable:
76 read = os.read(testclass.vpp.stdout.fileno(), 1024)
77 testclass.vpp_stdout_deque.append(read)
78 if testclass.vpp.stderr.fileno() in readable:
79 read = os.read(testclass.vpp.stderr.fileno(), 1024)
80 testclass.vpp_stderr_deque.append(read)
81 # ignoring the dummy pipe here intentionally - the flag will take care
82 # of properly terminating the loop
85 def running_extended_tests():
87 s = os.getenv("EXTENDED_TESTS")
88 return True if s.lower() in ("y", "yes", "1") else False
94 class VppTestCase(unittest.TestCase):
95 """This subclass is a base class for VPP test cases that are implemented as
96 classes. It provides methods to create and run test case.
100 def packet_infos(self):
101 """List of packet infos"""
102 return self._packet_infos
105 def get_packet_count_for_if_idx(cls, dst_if_index):
106 """Get the number of packet info for specified destination if index"""
107 if dst_if_index in cls._packet_count_for_dst_if_idx:
108 return cls._packet_count_for_dst_if_idx[dst_if_index]
114 """Return the instance of this testcase"""
115 return cls.test_instance
118 def set_debug_flags(cls, d):
119 cls.debug_core = False
120 cls.debug_gdb = False
121 cls.debug_gdbserver = False
126 cls.debug_core = True
129 elif dl == "gdbserver":
130 cls.debug_gdbserver = True
132 raise Exception("Unrecognized DEBUG option: '%s'" % d)
135 def setUpConstants(cls):
136 """ Set-up the test case class based on environment variables """
138 s = os.getenv("STEP")
139 cls.step = True if s.lower() in ("y", "yes", "1") else False
143 d = os.getenv("DEBUG")
146 cls.set_debug_flags(d)
147 cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
148 cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
149 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
151 if cls.plugin_path is not None:
152 if cls.extern_plugin_path is not None:
153 plugin_path = "%s:%s" % (
154 cls.plugin_path, cls.extern_plugin_path)
156 plugin_path = cls.plugin_path
157 elif cls.extern_plugin_path is not None:
158 plugin_path = cls.extern_plugin_path
160 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
161 debug_cli = "cli-listen localhost:5002"
164 size = os.getenv("COREDUMP_SIZE")
166 coredump_size = "coredump-size %s" % size
169 if coredump_size is None:
170 coredump_size = "coredump-size unlimited"
171 cls.vpp_cmdline = [cls.vpp_bin, "unix",
172 "{", "nodaemon", debug_cli, coredump_size, "}",
173 "api-trace", "{", "on", "}",
174 "api-segment", "{", "prefix", cls.shm_prefix, "}",
175 "plugins", "{", "plugin", "dpdk_plugin.so", "{",
177 if plugin_path is not None:
178 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
179 cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
182 def wait_for_enter(cls):
183 if cls.debug_gdbserver:
184 print(double_line_delim)
185 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
187 print(double_line_delim)
188 print("Spawned VPP with PID: %d" % cls.vpp.pid)
190 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
192 print(single_line_delim)
193 print("You can debug the VPP using e.g.:")
194 if cls.debug_gdbserver:
195 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
196 print("Now is the time to attach a gdb by running the above "
197 "command, set up breakpoints etc. and then resume VPP from "
198 "within gdb by issuing the 'continue' command")
200 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
201 print("Now is the time to attach a gdb by running the above "
202 "command and set up breakpoints etc.")
203 print(single_line_delim)
204 raw_input("Press ENTER to continue running the testcase...")
208 cmdline = cls.vpp_cmdline
210 if cls.debug_gdbserver:
211 gdbserver = '/usr/bin/gdbserver'
212 if not os.path.isfile(gdbserver) or \
213 not os.access(gdbserver, os.X_OK):
214 raise Exception("gdbserver binary '%s' does not exist or is "
215 "not executable" % gdbserver)
217 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
218 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
221 cls.vpp = subprocess.Popen(cmdline,
222 stdout=subprocess.PIPE,
223 stderr=subprocess.PIPE,
225 except Exception as e:
226 cls.logger.critical("Couldn't start vpp: %s" % e)
234 Perform class setup before running the testcase
235 Remove shared memory files, start vpp and connect the vpp-api
237 gc.collect() # run garbage collection first
238 cls.logger = getLogger(cls.__name__)
239 cls.tempdir = tempfile.mkdtemp(
240 prefix='vpp-unittest-' + cls.__name__ + '-')
241 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
242 cls.file_handler.setFormatter(
243 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
245 cls.file_handler.setLevel(DEBUG)
246 cls.logger.addHandler(cls.file_handler)
247 cls.shm_prefix = cls.tempdir.split("/")[-1]
248 os.chdir(cls.tempdir)
249 cls.logger.info("Temporary dir is %s, shm prefix is %s",
250 cls.tempdir, cls.shm_prefix)
252 cls.reset_packet_infos()
254 cls._zombie_captures = []
257 cls.registry = VppObjectRegistry()
258 cls.vpp_startup_failed = False
259 # need to catch exceptions here because if we raise, then the cleanup
260 # doesn't get called and we might end with a zombie vpp
263 cls.vpp_stdout_deque = deque()
264 cls.vpp_stderr_deque = deque()
265 cls.pump_thread_stop_flag = Event()
266 cls.pump_thread_wakeup_pipe = os.pipe()
267 cls.pump_thread = Thread(target=pump_output, args=(cls,))
268 cls.pump_thread.daemon = True
269 cls.pump_thread.start()
270 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls)
275 cls.vapi.register_hook(hook)
276 cls.sleep(0.1, "after vpp startup, before initial poll")
280 cls.vpp_startup_failed = True
282 "VPP died shortly after startup, check the"
283 " output to standard error for possible cause")
288 if cls.debug_gdbserver:
289 print(colorize("You're running VPP inside gdbserver but "
290 "VPP-API connection failed, did you forget "
291 "to 'continue' VPP from within gdb?", RED))
294 t, v, tb = sys.exc_info()
304 Disconnect vpp-api, kill vpp and cleanup shared memory files
306 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
308 if cls.vpp.returncode is None:
309 print(double_line_delim)
310 print("VPP or GDB server is still running")
311 print(single_line_delim)
312 raw_input("When done debugging, press ENTER to kill the "
313 "process and finish running the testcase...")
315 os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
316 cls.pump_thread_stop_flag.set()
317 if hasattr(cls, 'pump_thread'):
318 cls.logger.debug("Waiting for pump thread to stop")
319 cls.pump_thread.join()
320 if hasattr(cls, 'vpp_stderr_reader_thread'):
321 cls.logger.debug("Waiting for stdderr pump to stop")
322 cls.vpp_stderr_reader_thread.join()
324 if hasattr(cls, 'vpp'):
325 if hasattr(cls, 'vapi'):
326 cls.vapi.disconnect()
329 if cls.vpp.returncode is None:
330 cls.logger.debug("Sending TERM to vpp")
332 cls.logger.debug("Waiting for vpp to die")
333 cls.vpp.communicate()
336 if cls.vpp_startup_failed:
337 stdout_log = cls.logger.info
338 stderr_log = cls.logger.critical
340 stdout_log = cls.logger.info
341 stderr_log = cls.logger.info
343 if hasattr(cls, 'vpp_stdout_deque'):
344 stdout_log(single_line_delim)
345 stdout_log('VPP output to stdout while running %s:', cls.__name__)
346 stdout_log(single_line_delim)
347 vpp_output = "".join(cls.vpp_stdout_deque)
348 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
350 stdout_log('\n%s', vpp_output)
351 stdout_log(single_line_delim)
353 if hasattr(cls, 'vpp_stderr_deque'):
354 stderr_log(single_line_delim)
355 stderr_log('VPP output to stderr while running %s:', cls.__name__)
356 stderr_log(single_line_delim)
357 vpp_output = "".join(cls.vpp_stderr_deque)
358 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
360 stderr_log('\n%s', vpp_output)
361 stderr_log(single_line_delim)
364 def tearDownClass(cls):
365 """ Perform final cleanup after running all tests in this test-case """
367 cls.file_handler.close()
370 """ Show various debug prints after each test """
371 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
372 (self.__class__.__name__, self._testMethodName,
373 self._testMethodDoc))
374 if not self.vpp_dead:
375 self.logger.debug(self.vapi.cli("show trace"))
376 self.logger.info(self.vapi.ppcli("show interface"))
377 self.logger.info(self.vapi.ppcli("show hardware"))
378 self.logger.info(self.vapi.ppcli("show error"))
379 self.logger.info(self.vapi.ppcli("show run"))
380 self.registry.remove_vpp_config(self.logger)
381 # Save/Dump VPP api trace log
382 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
383 tmp_api_trace = "/tmp/%s" % api_trace
384 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
385 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
386 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
388 os.rename(tmp_api_trace, vpp_api_trace_log)
389 self.logger.info(self.vapi.ppcli("api trace dump %s" %
392 self.registry.unregister_all(self.logger)
395 """ Clear trace before running each test"""
396 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
397 (self.__class__.__name__, self._testMethodName,
398 self._testMethodDoc))
400 raise Exception("VPP is dead when setting up the test")
401 self.sleep(.1, "during setUp")
402 self.vpp_stdout_deque.append(
403 "--- test setUp() for %s.%s(%s) starts here ---\n" %
404 (self.__class__.__name__, self._testMethodName,
405 self._testMethodDoc))
406 self.vpp_stderr_deque.append(
407 "--- test setUp() for %s.%s(%s) starts here ---\n" %
408 (self.__class__.__name__, self._testMethodName,
409 self._testMethodDoc))
410 self.vapi.cli("clear trace")
411 # store the test instance inside the test class - so that objects
412 # holding the class can access instance methods (like assertEqual)
413 type(self).test_instance = self
416 def pg_enable_capture(cls, interfaces):
418 Enable capture on packet-generator interfaces
420 :param interfaces: iterable interface indexes
427 def register_capture(cls, cap_name):
428 """ Register a capture in the testclass """
429 # add to the list of captures with current timestamp
430 cls._captures.append((time.time(), cap_name))
431 # filter out from zombies
432 cls._zombie_captures = [(stamp, name)
433 for (stamp, name) in cls._zombie_captures
438 """ Remove any zombie captures and enable the packet generator """
439 # how long before capture is allowed to be deleted - otherwise vpp
440 # crashes - 100ms seems enough (this shouldn't be needed at all)
443 for stamp, cap_name in cls._zombie_captures:
444 wait = stamp + capture_ttl - now
446 cls.sleep(wait, "before deleting capture %s" % cap_name)
448 cls.logger.debug("Removing zombie capture %s" % cap_name)
449 cls.vapi.cli('packet-generator delete %s' % cap_name)
451 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
452 cls.vapi.cli('packet-generator enable')
453 cls._zombie_captures = cls._captures
457 def create_pg_interfaces(cls, interfaces):
459 Create packet-generator interfaces.
461 :param interfaces: iterable indexes of the interfaces.
462 :returns: List of created interfaces.
467 intf = VppPGInterface(cls, i)
468 setattr(cls, intf.name, intf)
470 cls.pg_interfaces = result
474 def create_loopback_interfaces(cls, interfaces):
476 Create loopback interfaces.
478 :param interfaces: iterable indexes of the interfaces.
479 :returns: List of created interfaces.
483 intf = VppLoInterface(cls, i)
484 setattr(cls, intf.name, intf)
486 cls.lo_interfaces = result
490 def extend_packet(packet, size):
492 Extend packet to given size by padding with spaces
493 NOTE: Currently works only when Raw layer is present.
495 :param packet: packet
496 :param size: target size
499 packet_len = len(packet) + 4
500 extend = size - packet_len
502 packet[Raw].load += ' ' * extend
505 def reset_packet_infos(cls):
506 """ Reset the list of packet info objects and packet counts to zero """
507 cls._packet_infos = {}
508 cls._packet_count_for_dst_if_idx = {}
511 def create_packet_info(cls, src_if, dst_if):
513 Create packet info object containing the source and destination indexes
514 and add it to the testcase's packet info list
516 :param VppInterface src_if: source interface
517 :param VppInterface dst_if: destination interface
519 :returns: _PacketInfo object
523 info.index = len(cls._packet_infos)
524 info.src = src_if.sw_if_index
525 info.dst = dst_if.sw_if_index
526 if isinstance(dst_if, VppSubInterface):
527 dst_idx = dst_if.parent.sw_if_index
529 dst_idx = dst_if.sw_if_index
530 if dst_idx in cls._packet_count_for_dst_if_idx:
531 cls._packet_count_for_dst_if_idx[dst_idx] += 1
533 cls._packet_count_for_dst_if_idx[dst_idx] = 1
534 cls._packet_infos[info.index] = info
538 def info_to_payload(info):
540 Convert _PacketInfo object to packet payload
542 :param info: _PacketInfo object
544 :returns: string containing serialized data from packet info
546 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
550 def payload_to_info(payload):
552 Convert packet payload to _PacketInfo object
554 :param payload: packet payload
556 :returns: _PacketInfo object containing de-serialized data from payload
559 numbers = payload.split()
561 info.index = int(numbers[0])
562 info.src = int(numbers[1])
563 info.dst = int(numbers[2])
564 info.ip = int(numbers[3])
565 info.proto = int(numbers[4])
568 def get_next_packet_info(self, info):
570 Iterate over the packet info list stored in the testcase
571 Start iteration with first element if info is None
572 Continue based on index in info if info is specified
574 :param info: info or None
575 :returns: next info in list or None if no more infos
580 next_index = info.index + 1
581 if next_index == len(self._packet_infos):
584 return self._packet_infos[next_index]
586 def get_next_packet_info_for_interface(self, src_index, info):
588 Search the packet info list for the next packet info with same source
591 :param src_index: source interface index to search for
592 :param info: packet info - where to start the search
593 :returns: packet info or None
597 info = self.get_next_packet_info(info)
600 if info.src == src_index:
603 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
605 Search the packet info list for the next packet info with same source
606 and destination interface indexes
608 :param src_index: source interface index to search for
609 :param dst_index: destination interface index to search for
610 :param info: packet info - where to start the search
611 :returns: packet info or None
615 info = self.get_next_packet_info_for_interface(src_index, info)
618 if info.dst == dst_index:
621 def assert_equal(self, real_value, expected_value, name_or_class=None):
622 if name_or_class is None:
623 self.assertEqual(real_value, expected_value)
626 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
627 msg = msg % (getdoc(name_or_class).strip(),
628 real_value, str(name_or_class(real_value)),
629 expected_value, str(name_or_class(expected_value)))
631 msg = "Invalid %s: %s does not match expected value %s" % (
632 name_or_class, real_value, expected_value)
634 self.assertEqual(real_value, expected_value, msg)
636 def assert_in_range(self,
644 msg = "Invalid %s: %s out of range <%s,%s>" % (
645 name, real_value, expected_min, expected_max)
646 self.assertTrue(expected_min <= real_value <= expected_max, msg)
649 def sleep(cls, timeout, remark=None):
650 if hasattr(cls, 'logger'):
651 cls.logger.debug("Starting sleep for %ss (%s)" % (timeout, remark))
655 if after - before > 2 * timeout:
657 "time.sleep() derp! slept for %ss instead of ~%ss!" % (
658 after - before, timeout))
659 if hasattr(cls, 'logger'):
661 "Finished sleep (%s) - slept %ss (wanted %ss)" % (
662 remark, after - before, timeout))
665 class TestCasePrinter(object):
669 self.__dict__ = self._shared_state
670 if not hasattr(self, "_test_case_set"):
671 self._test_case_set = set()
673 def print_test_case_heading_if_first_time(self, case):
674 if case.__class__ not in self._test_case_set:
675 print(double_line_delim)
676 print(colorize(getdoc(case.__class__).splitlines()[0], YELLOW))
677 print(double_line_delim)
678 self._test_case_set.add(case.__class__)
681 class VppTestResult(unittest.TestResult):
683 @property result_string
684 String variable to store the test case result string.
686 List variable containing 2-tuples of TestCase instances and strings
687 holding formatted tracebacks. Each tuple represents a test which
688 raised an unexpected exception.
690 List variable containing 2-tuples of TestCase instances and strings
691 holding formatted tracebacks. Each tuple represents a test where
692 a failure was explicitly signalled using the TestCase.assert*()
696 def __init__(self, stream, descriptions, verbosity):
698 :param stream File descriptor to store where to report test results.
699 Set to the standard error stream by default.
700 :param descriptions Boolean variable to store information if to use
701 test case descriptions.
702 :param verbosity Integer variable to store required verbosity level.
704 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
706 self.descriptions = descriptions
707 self.verbosity = verbosity
708 self.result_string = None
709 self.printer = TestCasePrinter()
711 def addSuccess(self, test):
713 Record a test succeeded result
718 if hasattr(test, 'logger'):
719 test.logger.debug("--- addSuccess() %s.%s(%s) called"
720 % (test.__class__.__name__,
721 test._testMethodName,
722 test._testMethodDoc))
723 unittest.TestResult.addSuccess(self, test)
724 self.result_string = colorize("OK", GREEN)
726 def addSkip(self, test, reason):
728 Record a test skipped.
734 if hasattr(test, 'logger'):
735 test.logger.debug("--- addSkip() %s.%s(%s) called, reason is %s"
736 % (test.__class__.__name__,
737 test._testMethodName,
740 unittest.TestResult.addSkip(self, test, reason)
741 self.result_string = colorize("SKIP", YELLOW)
743 def addFailure(self, test, err):
745 Record a test failed result
748 :param err: error message
751 if hasattr(test, 'logger'):
752 test.logger.debug("--- addFailure() %s.%s(%s) called, err is %s"
753 % (test.__class__.__name__,
754 test._testMethodName,
755 test._testMethodDoc, err))
756 test.logger.debug("formatted exception is:\n%s" %
757 "".join(format_exception(*err)))
758 unittest.TestResult.addFailure(self, test, err)
759 if hasattr(test, 'tempdir'):
760 self.result_string = colorize("FAIL", RED) + \
761 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
763 self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
765 def addError(self, test, err):
767 Record a test error result
770 :param err: error message
773 if hasattr(test, 'logger'):
774 test.logger.debug("--- addError() %s.%s(%s) called, err is %s"
775 % (test.__class__.__name__,
776 test._testMethodName,
777 test._testMethodDoc, err))
778 test.logger.debug("formatted exception is:\n%s" %
779 "".join(format_exception(*err)))
780 unittest.TestResult.addError(self, test, err)
781 if hasattr(test, 'tempdir'):
782 self.result_string = colorize("ERROR", RED) + \
783 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
785 self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
787 def getDescription(self, test):
792 :returns: test description
795 # TODO: if none print warning not raise exception
796 short_description = test.shortDescription()
797 if self.descriptions and short_description:
798 return short_description
802 def startTest(self, test):
809 self.printer.print_test_case_heading_if_first_time(test)
810 unittest.TestResult.startTest(self, test)
811 if self.verbosity > 0:
813 "Starting " + self.getDescription(test) + " ...")
814 self.stream.writeln(single_line_delim)
816 def stopTest(self, test):
823 unittest.TestResult.stopTest(self, test)
824 if self.verbosity > 0:
825 self.stream.writeln(single_line_delim)
826 self.stream.writeln("%-73s%s" % (self.getDescription(test),
828 self.stream.writeln(single_line_delim)
830 self.stream.writeln("%-73s%s" % (self.getDescription(test),
833 def printErrors(self):
835 Print errors from running the test case
837 self.stream.writeln()
838 self.printErrorList('ERROR', self.errors)
839 self.printErrorList('FAIL', self.failures)
841 def printErrorList(self, flavour, errors):
843 Print error list to the output stream together with error type
844 and test case description.
846 :param flavour: error type
847 :param errors: iterable errors
850 for test, err in errors:
851 self.stream.writeln(double_line_delim)
852 self.stream.writeln("%s: %s" %
853 (flavour, self.getDescription(test)))
854 self.stream.writeln(single_line_delim)
855 self.stream.writeln("%s" % err)
858 class VppTestRunner(unittest.TextTestRunner):
860 A basic test runner implementation which prints results to standard error.
863 def resultclass(self):
864 """Class maintaining the results of the tests"""
867 def __init__(self, stream=sys.stderr, descriptions=True, verbosity=1,
868 failfast=False, buffer=False, resultclass=None):
869 # ignore stream setting here, use hard-coded stdout to be in sync
870 # with prints from VppTestCase methods ...
871 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
872 verbosity, failfast, buffer,
877 def parse_test_option(self):
879 f = os.getenv(self.test_option)
882 filter_file_name = None
883 filter_class_name = None
884 filter_func_name = None
889 raise Exception("Unrecognized %s option: %s" %
890 (self.test_option, f))
892 if parts[2] not in ('*', ''):
893 filter_func_name = parts[2]
894 if parts[1] not in ('*', ''):
895 filter_class_name = parts[1]
896 if parts[0] not in ('*', ''):
897 if parts[0].startswith('test_'):
898 filter_file_name = parts[0]
900 filter_file_name = 'test_%s' % parts[0]
902 if f.startswith('test_'):
905 filter_file_name = 'test_%s' % f
906 return filter_file_name, filter_class_name, filter_func_name
908 def filter_tests(self, tests, filter_file, filter_class, filter_func):
909 result = unittest.suite.TestSuite()
911 if isinstance(t, unittest.suite.TestSuite):
912 # this is a bunch of tests, recursively filter...
913 x = self.filter_tests(t, filter_file, filter_class,
915 if x.countTestCases() > 0:
917 elif isinstance(t, unittest.TestCase):
918 # this is a single test
919 parts = t.id().split('.')
920 # t.id() for common cases like this:
921 # test_classifier.TestClassifier.test_acl_ip
922 # apply filtering only if it is so
924 if filter_file and filter_file != parts[0]:
926 if filter_class and filter_class != parts[1]:
928 if filter_func and filter_func != parts[2]:
932 # unexpected object, don't touch it
943 gc.disable() # disable garbage collection, we'll do that manually
944 print("Running tests using custom test runner") # debug message
945 filter_file, filter_class, filter_func = self.parse_test_option()
946 print("Active filters: file=%s, class=%s, function=%s" % (
947 filter_file, filter_class, filter_func))
948 filtered = self.filter_tests(test, filter_file, filter_class,
950 print("%s out of %s tests match specified filters" % (
951 filtered.countTestCases(), test.countTestCases()))
952 if not running_extended_tests():
953 print("Not running extended tests (some tests will be skipped)")
954 return super(VppTestRunner, self).run(filtered)