3 from __future__ import print_function
13 from collections import deque
14 from threading import Thread, Event
15 from inspect import getdoc
16 from traceback import format_exception
17 from logging import FileHandler, DEBUG, Formatter
18 from scapy.packet import Raw
19 from hook import StepHook, PollHook
20 from vpp_pg_interface import VppPGInterface
21 from vpp_sub_interface import VppSubInterface
22 from vpp_lo_interface import VppLoInterface
23 from vpp_papi_provider import VppPapiProvider
25 from vpp_object import VppObjectRegistry
26 if os.name == 'posix' and sys.version_info[0] < 3:
27 # using subprocess32 is recommended by python official documentation
28 # @ https://docs.python.org/2/library/subprocess.html
29 import subprocess32 as subprocess
34 Test framework module.
36 The module provides a set of tools for constructing and running tests and
37 representing the results.
41 class _PacketInfo(object):
42 """Private class to create packet info object.
44 Help process information about the next packet.
45 Set variables to default values.
47 #: Store the index of the packet.
49 #: Store the index of the source packet generator interface of the packet.
51 #: Store the index of the destination packet generator interface
54 #: Store expected ip version
56 #: Store expected upper protocol
58 #: Store the copy of the former packet.
61 def __eq__(self, other):
62 index = self.index == other.index
63 src = self.src == other.src
64 dst = self.dst == other.dst
65 data = self.data == other.data
66 return index and src and dst and data
69 def pump_output(testclass):
70 """ pump output from vpp stdout/stderr to proper queues """
71 while not testclass.pump_thread_stop_flag.wait(0):
72 readable = select.select([testclass.vpp.stdout.fileno(),
73 testclass.vpp.stderr.fileno(),
74 testclass.pump_thread_wakeup_pipe[0]],
76 if testclass.vpp.stdout.fileno() in readable:
77 read = os.read(testclass.vpp.stdout.fileno(), 1024)
78 testclass.vpp_stdout_deque.append(read)
79 if testclass.vpp.stderr.fileno() in readable:
80 read = os.read(testclass.vpp.stderr.fileno(), 1024)
81 testclass.vpp_stderr_deque.append(read)
82 # ignoring the dummy pipe here intentionally - the flag will take care
83 # of properly terminating the loop
86 def running_extended_tests():
88 s = os.getenv("EXTENDED_TESTS")
89 return True if s.lower() in ("y", "yes", "1") else False
95 class VppTestCase(unittest.TestCase):
96 """This subclass is a base class for VPP test cases that are implemented as
97 classes. It provides methods to create and run test case.
101 def packet_infos(self):
102 """List of packet infos"""
103 return self._packet_infos
106 def get_packet_count_for_if_idx(cls, dst_if_index):
107 """Get the number of packet info for specified destination if index"""
108 if dst_if_index in cls._packet_count_for_dst_if_idx:
109 return cls._packet_count_for_dst_if_idx[dst_if_index]
115 """Return the instance of this testcase"""
116 return cls.test_instance
119 def set_debug_flags(cls, d):
120 cls.debug_core = False
121 cls.debug_gdb = False
122 cls.debug_gdbserver = False
127 cls.debug_core = True
130 elif dl == "gdbserver":
131 cls.debug_gdbserver = True
133 raise Exception("Unrecognized DEBUG option: '%s'" % d)
136 def setUpConstants(cls):
137 """ Set-up the test case class based on environment variables """
139 s = os.getenv("STEP")
140 cls.step = True if s.lower() in ("y", "yes", "1") else False
144 d = os.getenv("DEBUG")
147 cls.set_debug_flags(d)
148 cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
149 cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
150 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
152 if cls.plugin_path is not None:
153 if cls.extern_plugin_path is not None:
154 plugin_path = "%s:%s" % (
155 cls.plugin_path, cls.extern_plugin_path)
157 plugin_path = cls.plugin_path
158 elif cls.extern_plugin_path is not None:
159 plugin_path = cls.extern_plugin_path
161 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
162 debug_cli = "cli-listen localhost:5002"
165 size = os.getenv("COREDUMP_SIZE")
167 coredump_size = "coredump-size %s" % size
170 if coredump_size is None:
171 coredump_size = "coredump-size unlimited"
172 cls.vpp_cmdline = [cls.vpp_bin, "unix",
173 "{", "nodaemon", debug_cli, coredump_size, "}",
174 "api-trace", "{", "on", "}",
175 "api-segment", "{", "prefix", cls.shm_prefix, "}",
176 "plugins", "{", "plugin", "dpdk_plugin.so", "{",
178 if plugin_path is not None:
179 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
180 cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
183 def wait_for_enter(cls):
184 if cls.debug_gdbserver:
185 print(double_line_delim)
186 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
188 print(double_line_delim)
189 print("Spawned VPP with PID: %d" % cls.vpp.pid)
191 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
193 print(single_line_delim)
194 print("You can debug the VPP using e.g.:")
195 if cls.debug_gdbserver:
196 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
197 print("Now is the time to attach a gdb by running the above "
198 "command, set up breakpoints etc. and then resume VPP from "
199 "within gdb by issuing the 'continue' command")
201 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
202 print("Now is the time to attach a gdb by running the above "
203 "command and set up breakpoints etc.")
204 print(single_line_delim)
205 raw_input("Press ENTER to continue running the testcase...")
209 cmdline = cls.vpp_cmdline
211 if cls.debug_gdbserver:
212 gdbserver = '/usr/bin/gdbserver'
213 if not os.path.isfile(gdbserver) or \
214 not os.access(gdbserver, os.X_OK):
215 raise Exception("gdbserver binary '%s' does not exist or is "
216 "not executable" % gdbserver)
218 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
219 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
222 cls.vpp = subprocess.Popen(cmdline,
223 stdout=subprocess.PIPE,
224 stderr=subprocess.PIPE,
226 except Exception as e:
227 cls.logger.critical("Couldn't start vpp: %s" % e)
235 Perform class setup before running the testcase
236 Remove shared memory files, start vpp and connect the vpp-api
238 gc.collect() # run garbage collection first
239 cls.logger = getLogger(cls.__name__)
240 cls.tempdir = tempfile.mkdtemp(
241 prefix='vpp-unittest-' + cls.__name__ + '-')
242 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
243 cls.file_handler.setFormatter(
244 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
246 cls.file_handler.setLevel(DEBUG)
247 cls.logger.addHandler(cls.file_handler)
248 cls.shm_prefix = cls.tempdir.split("/")[-1]
249 os.chdir(cls.tempdir)
250 cls.logger.info("Temporary dir is %s, shm prefix is %s",
251 cls.tempdir, cls.shm_prefix)
253 cls.reset_packet_infos()
255 cls._zombie_captures = []
258 cls.registry = VppObjectRegistry()
259 cls.vpp_startup_failed = False
260 # need to catch exceptions here because if we raise, then the cleanup
261 # doesn't get called and we might end with a zombie vpp
264 cls.vpp_stdout_deque = deque()
265 cls.vpp_stderr_deque = deque()
266 cls.pump_thread_stop_flag = Event()
267 cls.pump_thread_wakeup_pipe = os.pipe()
268 cls.pump_thread = Thread(target=pump_output, args=(cls,))
269 cls.pump_thread.daemon = True
270 cls.pump_thread.start()
271 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls)
276 cls.vapi.register_hook(hook)
277 cls.sleep(0.1, "after vpp startup, before initial poll")
281 cls.vpp_startup_failed = True
283 "VPP died shortly after startup, check the"
284 " output to standard error for possible cause")
289 if cls.debug_gdbserver:
290 print(colorize("You're running VPP inside gdbserver but "
291 "VPP-API connection failed, did you forget "
292 "to 'continue' VPP from within gdb?", RED))
295 t, v, tb = sys.exc_info()
305 Disconnect vpp-api, kill vpp and cleanup shared memory files
307 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
309 if cls.vpp.returncode is None:
310 print(double_line_delim)
311 print("VPP or GDB server is still running")
312 print(single_line_delim)
313 raw_input("When done debugging, press ENTER to kill the "
314 "process and finish running the testcase...")
316 os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
317 cls.pump_thread_stop_flag.set()
318 if hasattr(cls, 'pump_thread'):
319 cls.logger.debug("Waiting for pump thread to stop")
320 cls.pump_thread.join()
321 if hasattr(cls, 'vpp_stderr_reader_thread'):
322 cls.logger.debug("Waiting for stdderr pump to stop")
323 cls.vpp_stderr_reader_thread.join()
325 if hasattr(cls, 'vpp'):
326 if hasattr(cls, 'vapi'):
327 cls.vapi.disconnect()
330 if cls.vpp.returncode is None:
331 cls.logger.debug("Sending TERM to vpp")
333 cls.logger.debug("Waiting for vpp to die")
334 cls.vpp.communicate()
337 if cls.vpp_startup_failed:
338 stdout_log = cls.logger.info
339 stderr_log = cls.logger.critical
341 stdout_log = cls.logger.info
342 stderr_log = cls.logger.info
344 if hasattr(cls, 'vpp_stdout_deque'):
345 stdout_log(single_line_delim)
346 stdout_log('VPP output to stdout while running %s:', cls.__name__)
347 stdout_log(single_line_delim)
348 vpp_output = "".join(cls.vpp_stdout_deque)
349 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
351 stdout_log('\n%s', vpp_output)
352 stdout_log(single_line_delim)
354 if hasattr(cls, 'vpp_stderr_deque'):
355 stderr_log(single_line_delim)
356 stderr_log('VPP output to stderr while running %s:', cls.__name__)
357 stderr_log(single_line_delim)
358 vpp_output = "".join(cls.vpp_stderr_deque)
359 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
361 stderr_log('\n%s', vpp_output)
362 stderr_log(single_line_delim)
365 def tearDownClass(cls):
366 """ Perform final cleanup after running all tests in this test-case """
368 cls.file_handler.close()
371 """ Show various debug prints after each test """
372 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
373 (self.__class__.__name__, self._testMethodName,
374 self._testMethodDoc))
375 if not self.vpp_dead:
376 self.logger.debug(self.vapi.cli("show trace"))
377 self.logger.info(self.vapi.ppcli("show interface"))
378 self.logger.info(self.vapi.ppcli("show hardware"))
379 self.logger.info(self.vapi.ppcli("show error"))
380 self.logger.info(self.vapi.ppcli("show run"))
381 self.registry.remove_vpp_config(self.logger)
382 # Save/Dump VPP api trace log
383 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
384 tmp_api_trace = "/tmp/%s" % api_trace
385 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
386 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
387 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
389 os.rename(tmp_api_trace, vpp_api_trace_log)
390 self.logger.info(self.vapi.ppcli("api trace dump %s" %
393 self.registry.unregister_all(self.logger)
396 """ Clear trace before running each test"""
397 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
398 (self.__class__.__name__, self._testMethodName,
399 self._testMethodDoc))
401 raise Exception("VPP is dead when setting up the test")
402 self.sleep(.1, "during setUp")
403 self.vpp_stdout_deque.append(
404 "--- test setUp() for %s.%s(%s) starts here ---\n" %
405 (self.__class__.__name__, self._testMethodName,
406 self._testMethodDoc))
407 self.vpp_stderr_deque.append(
408 "--- test setUp() for %s.%s(%s) starts here ---\n" %
409 (self.__class__.__name__, self._testMethodName,
410 self._testMethodDoc))
411 self.vapi.cli("clear trace")
412 # store the test instance inside the test class - so that objects
413 # holding the class can access instance methods (like assertEqual)
414 type(self).test_instance = self
417 def pg_enable_capture(cls, interfaces):
419 Enable capture on packet-generator interfaces
421 :param interfaces: iterable interface indexes
428 def register_capture(cls, cap_name):
429 """ Register a capture in the testclass """
430 # add to the list of captures with current timestamp
431 cls._captures.append((time.time(), cap_name))
432 # filter out from zombies
433 cls._zombie_captures = [(stamp, name)
434 for (stamp, name) in cls._zombie_captures
439 """ Remove any zombie captures and enable the packet generator """
440 # how long before capture is allowed to be deleted - otherwise vpp
441 # crashes - 100ms seems enough (this shouldn't be needed at all)
444 for stamp, cap_name in cls._zombie_captures:
445 wait = stamp + capture_ttl - now
447 cls.sleep(wait, "before deleting capture %s" % cap_name)
449 cls.logger.debug("Removing zombie capture %s" % cap_name)
450 cls.vapi.cli('packet-generator delete %s' % cap_name)
452 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
453 cls.vapi.cli('packet-generator enable')
454 cls._zombie_captures = cls._captures
458 def create_pg_interfaces(cls, interfaces):
460 Create packet-generator interfaces.
462 :param interfaces: iterable indexes of the interfaces.
463 :returns: List of created interfaces.
468 intf = VppPGInterface(cls, i)
469 setattr(cls, intf.name, intf)
471 cls.pg_interfaces = result
475 def create_loopback_interfaces(cls, interfaces):
477 Create loopback interfaces.
479 :param interfaces: iterable indexes of the interfaces.
480 :returns: List of created interfaces.
484 intf = VppLoInterface(cls, i)
485 setattr(cls, intf.name, intf)
487 cls.lo_interfaces = result
491 def extend_packet(packet, size):
493 Extend packet to given size by padding with spaces
494 NOTE: Currently works only when Raw layer is present.
496 :param packet: packet
497 :param size: target size
500 packet_len = len(packet) + 4
501 extend = size - packet_len
503 packet[Raw].load += ' ' * extend
506 def reset_packet_infos(cls):
507 """ Reset the list of packet info objects and packet counts to zero """
508 cls._packet_infos = {}
509 cls._packet_count_for_dst_if_idx = {}
512 def create_packet_info(cls, src_if, dst_if):
514 Create packet info object containing the source and destination indexes
515 and add it to the testcase's packet info list
517 :param VppInterface src_if: source interface
518 :param VppInterface dst_if: destination interface
520 :returns: _PacketInfo object
524 info.index = len(cls._packet_infos)
525 info.src = src_if.sw_if_index
526 info.dst = dst_if.sw_if_index
527 if isinstance(dst_if, VppSubInterface):
528 dst_idx = dst_if.parent.sw_if_index
530 dst_idx = dst_if.sw_if_index
531 if dst_idx in cls._packet_count_for_dst_if_idx:
532 cls._packet_count_for_dst_if_idx[dst_idx] += 1
534 cls._packet_count_for_dst_if_idx[dst_idx] = 1
535 cls._packet_infos[info.index] = info
539 def info_to_payload(info):
541 Convert _PacketInfo object to packet payload
543 :param info: _PacketInfo object
545 :returns: string containing serialized data from packet info
547 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
551 def payload_to_info(payload):
553 Convert packet payload to _PacketInfo object
555 :param payload: packet payload
557 :returns: _PacketInfo object containing de-serialized data from payload
560 numbers = payload.split()
562 info.index = int(numbers[0])
563 info.src = int(numbers[1])
564 info.dst = int(numbers[2])
565 info.ip = int(numbers[3])
566 info.proto = int(numbers[4])
569 def get_next_packet_info(self, info):
571 Iterate over the packet info list stored in the testcase
572 Start iteration with first element if info is None
573 Continue based on index in info if info is specified
575 :param info: info or None
576 :returns: next info in list or None if no more infos
581 next_index = info.index + 1
582 if next_index == len(self._packet_infos):
585 return self._packet_infos[next_index]
587 def get_next_packet_info_for_interface(self, src_index, info):
589 Search the packet info list for the next packet info with same source
592 :param src_index: source interface index to search for
593 :param info: packet info - where to start the search
594 :returns: packet info or None
598 info = self.get_next_packet_info(info)
601 if info.src == src_index:
604 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
606 Search the packet info list for the next packet info with same source
607 and destination interface indexes
609 :param src_index: source interface index to search for
610 :param dst_index: destination interface index to search for
611 :param info: packet info - where to start the search
612 :returns: packet info or None
616 info = self.get_next_packet_info_for_interface(src_index, info)
619 if info.dst == dst_index:
622 def assert_equal(self, real_value, expected_value, name_or_class=None):
623 if name_or_class is None:
624 self.assertEqual(real_value, expected_value)
627 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
628 msg = msg % (getdoc(name_or_class).strip(),
629 real_value, str(name_or_class(real_value)),
630 expected_value, str(name_or_class(expected_value)))
632 msg = "Invalid %s: %s does not match expected value %s" % (
633 name_or_class, real_value, expected_value)
635 self.assertEqual(real_value, expected_value, msg)
637 def assert_in_range(self,
645 msg = "Invalid %s: %s out of range <%s,%s>" % (
646 name, real_value, expected_min, expected_max)
647 self.assertTrue(expected_min <= real_value <= expected_max, msg)
650 def sleep(cls, timeout, remark=None):
651 if hasattr(cls, 'logger'):
652 cls.logger.debug("Starting sleep for %ss (%s)" % (timeout, remark))
656 if after - before > 2 * timeout:
657 cls.logger.error("unexpected time.sleep() result - "
658 "slept for %ss instead of ~%ss!" % (
659 after - before, timeout))
660 if hasattr(cls, 'logger'):
662 "Finished sleep (%s) - slept %ss (wanted %ss)" % (
663 remark, after - before, timeout))
666 class TestCasePrinter(object):
670 self.__dict__ = self._shared_state
671 if not hasattr(self, "_test_case_set"):
672 self._test_case_set = set()
674 def print_test_case_heading_if_first_time(self, case):
675 if case.__class__ not in self._test_case_set:
676 print(double_line_delim)
677 print(colorize(getdoc(case.__class__).splitlines()[0], YELLOW))
678 print(double_line_delim)
679 self._test_case_set.add(case.__class__)
682 class VppTestResult(unittest.TestResult):
684 @property result_string
685 String variable to store the test case result string.
687 List variable containing 2-tuples of TestCase instances and strings
688 holding formatted tracebacks. Each tuple represents a test which
689 raised an unexpected exception.
691 List variable containing 2-tuples of TestCase instances and strings
692 holding formatted tracebacks. Each tuple represents a test where
693 a failure was explicitly signalled using the TestCase.assert*()
697 def __init__(self, stream, descriptions, verbosity):
699 :param stream File descriptor to store where to report test results.
700 Set to the standard error stream by default.
701 :param descriptions Boolean variable to store information if to use
702 test case descriptions.
703 :param verbosity Integer variable to store required verbosity level.
705 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
707 self.descriptions = descriptions
708 self.verbosity = verbosity
709 self.result_string = None
710 self.printer = TestCasePrinter()
712 def addSuccess(self, test):
714 Record a test succeeded result
719 if hasattr(test, 'logger'):
720 test.logger.debug("--- addSuccess() %s.%s(%s) called"
721 % (test.__class__.__name__,
722 test._testMethodName,
723 test._testMethodDoc))
724 unittest.TestResult.addSuccess(self, test)
725 self.result_string = colorize("OK", GREEN)
727 def addSkip(self, test, reason):
729 Record a test skipped.
735 if hasattr(test, 'logger'):
736 test.logger.debug("--- addSkip() %s.%s(%s) called, reason is %s"
737 % (test.__class__.__name__,
738 test._testMethodName,
741 unittest.TestResult.addSkip(self, test, reason)
742 self.result_string = colorize("SKIP", YELLOW)
744 def addFailure(self, test, err):
746 Record a test failed result
749 :param err: error message
752 if hasattr(test, 'logger'):
753 test.logger.debug("--- addFailure() %s.%s(%s) called, err is %s"
754 % (test.__class__.__name__,
755 test._testMethodName,
756 test._testMethodDoc, err))
757 test.logger.debug("formatted exception is:\n%s" %
758 "".join(format_exception(*err)))
759 unittest.TestResult.addFailure(self, test, err)
760 if hasattr(test, 'tempdir'):
761 self.result_string = colorize("FAIL", RED) + \
762 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
764 self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
766 def addError(self, test, err):
768 Record a test error result
771 :param err: error message
774 if hasattr(test, 'logger'):
775 test.logger.debug("--- addError() %s.%s(%s) called, err is %s"
776 % (test.__class__.__name__,
777 test._testMethodName,
778 test._testMethodDoc, err))
779 test.logger.debug("formatted exception is:\n%s" %
780 "".join(format_exception(*err)))
781 unittest.TestResult.addError(self, test, err)
782 if hasattr(test, 'tempdir'):
783 self.result_string = colorize("ERROR", RED) + \
784 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
786 self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
788 def getDescription(self, test):
793 :returns: test description
796 # TODO: if none print warning not raise exception
797 short_description = test.shortDescription()
798 if self.descriptions and short_description:
799 return short_description
803 def startTest(self, test):
810 self.printer.print_test_case_heading_if_first_time(test)
811 unittest.TestResult.startTest(self, test)
812 if self.verbosity > 0:
814 "Starting " + self.getDescription(test) + " ...")
815 self.stream.writeln(single_line_delim)
817 def stopTest(self, test):
824 unittest.TestResult.stopTest(self, test)
825 if self.verbosity > 0:
826 self.stream.writeln(single_line_delim)
827 self.stream.writeln("%-73s%s" % (self.getDescription(test),
829 self.stream.writeln(single_line_delim)
831 self.stream.writeln("%-73s%s" % (self.getDescription(test),
834 def printErrors(self):
836 Print errors from running the test case
838 self.stream.writeln()
839 self.printErrorList('ERROR', self.errors)
840 self.printErrorList('FAIL', self.failures)
842 def printErrorList(self, flavour, errors):
844 Print error list to the output stream together with error type
845 and test case description.
847 :param flavour: error type
848 :param errors: iterable errors
851 for test, err in errors:
852 self.stream.writeln(double_line_delim)
853 self.stream.writeln("%s: %s" %
854 (flavour, self.getDescription(test)))
855 self.stream.writeln(single_line_delim)
856 self.stream.writeln("%s" % err)
859 class VppTestRunner(unittest.TextTestRunner):
861 A basic test runner implementation which prints results to standard error.
864 def resultclass(self):
865 """Class maintaining the results of the tests"""
868 def __init__(self, stream=sys.stderr, descriptions=True, verbosity=1,
869 failfast=False, buffer=False, resultclass=None):
870 # ignore stream setting here, use hard-coded stdout to be in sync
871 # with prints from VppTestCase methods ...
872 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
873 verbosity, failfast, buffer,
878 def parse_test_option(self):
880 f = os.getenv(self.test_option)
883 filter_file_name = None
884 filter_class_name = None
885 filter_func_name = None
890 raise Exception("Unrecognized %s option: %s" %
891 (self.test_option, f))
893 if parts[2] not in ('*', ''):
894 filter_func_name = parts[2]
895 if parts[1] not in ('*', ''):
896 filter_class_name = parts[1]
897 if parts[0] not in ('*', ''):
898 if parts[0].startswith('test_'):
899 filter_file_name = parts[0]
901 filter_file_name = 'test_%s' % parts[0]
903 if f.startswith('test_'):
906 filter_file_name = 'test_%s' % f
907 return filter_file_name, filter_class_name, filter_func_name
909 def filter_tests(self, tests, filter_file, filter_class, filter_func):
910 result = unittest.suite.TestSuite()
912 if isinstance(t, unittest.suite.TestSuite):
913 # this is a bunch of tests, recursively filter...
914 x = self.filter_tests(t, filter_file, filter_class,
916 if x.countTestCases() > 0:
918 elif isinstance(t, unittest.TestCase):
919 # this is a single test
920 parts = t.id().split('.')
921 # t.id() for common cases like this:
922 # test_classifier.TestClassifier.test_acl_ip
923 # apply filtering only if it is so
925 if filter_file and filter_file != parts[0]:
927 if filter_class and filter_class != parts[1]:
929 if filter_func and filter_func != parts[2]:
933 # unexpected object, don't touch it
944 faulthandler.enable() # emit stack trace to stderr if killed by signal
945 print("Running tests using custom test runner") # debug message
946 filter_file, filter_class, filter_func = self.parse_test_option()
947 print("Active filters: file=%s, class=%s, function=%s" % (
948 filter_file, filter_class, filter_func))
949 filtered = self.filter_tests(test, filter_file, filter_class,
951 print("%s out of %s tests match specified filters" % (
952 filtered.countTestCases(), test.countTestCases()))
953 if not running_extended_tests():
954 print("Not running extended tests (some tests will be skipped)")
955 return super(VppTestRunner, self).run(filtered)