3 from __future__ import print_function
14 from collections import deque
15 from threading import Thread, Event
16 from inspect import getdoc, isclass
17 from traceback import format_exception
18 from logging import FileHandler, DEBUG, Formatter
19 from scapy.packet import Raw
20 from hook import StepHook, PollHook
21 from vpp_pg_interface import VppPGInterface
22 from vpp_sub_interface import VppSubInterface
23 from vpp_lo_interface import VppLoInterface
24 from vpp_papi_provider import VppPapiProvider
26 from vpp_object import VppObjectRegistry
27 if os.name == 'posix' and sys.version_info[0] < 3:
28 # using subprocess32 is recommended by python official documentation
29 # @ https://docs.python.org/2/library/subprocess.html
30 import subprocess32 as subprocess
35 Test framework module.
37 The module provides a set of tools for constructing and running tests and
38 representing the results.
42 class _PacketInfo(object):
43 """Private class to create packet info object.
45 Help process information about the next packet.
46 Set variables to default values.
48 #: Store the index of the packet.
50 #: Store the index of the source packet generator interface of the packet.
52 #: Store the index of the destination packet generator interface
55 #: Store expected ip version
57 #: Store expected upper protocol
59 #: Store the copy of the former packet.
62 def __eq__(self, other):
63 index = self.index == other.index
64 src = self.src == other.src
65 dst = self.dst == other.dst
66 data = self.data == other.data
67 return index and src and dst and data
70 def pump_output(testclass):
71 """ pump output from vpp stdout/stderr to proper queues """
74 while not testclass.pump_thread_stop_flag.wait(0):
75 readable = select.select([testclass.vpp.stdout.fileno(),
76 testclass.vpp.stderr.fileno(),
77 testclass.pump_thread_wakeup_pipe[0]],
79 if testclass.vpp.stdout.fileno() in readable:
80 read = os.read(testclass.vpp.stdout.fileno(), 102400)
82 split = read.splitlines(True)
83 if len(stdout_fragment) > 0:
84 split[0] = "%s%s" % (stdout_fragment, split[0])
85 if len(split) > 0 and split[-1].endswith("\n"):
89 stdout_fragment = split[-1]
90 testclass.vpp_stdout_deque.extend(split[:limit])
91 if not testclass.cache_vpp_output:
92 for line in split[:limit]:
93 testclass.logger.debug(
94 "VPP STDOUT: %s" % line.rstrip("\n"))
95 if testclass.vpp.stderr.fileno() in readable:
96 read = os.read(testclass.vpp.stderr.fileno(), 102400)
98 split = read.splitlines(True)
99 if len(stderr_fragment) > 0:
100 split[0] = "%s%s" % (stderr_fragment, split[0])
101 if len(split) > 0 and split[-1].endswith("\n"):
105 stderr_fragment = split[-1]
106 testclass.vpp_stderr_deque.extend(split[:limit])
107 if not testclass.cache_vpp_output:
108 for line in split[:limit]:
109 testclass.logger.debug(
110 "VPP STDERR: %s" % line.rstrip("\n"))
111 # ignoring the dummy pipe here intentionally - the flag will take care
112 # of properly terminating the loop
115 def running_extended_tests():
117 s = os.getenv("EXTENDED_TESTS")
118 return True if s.lower() in ("y", "yes", "1") else False
124 def running_on_centos():
126 os_id = os.getenv("OS_ID")
127 return True if "centos" in os_id.lower() else False
133 class KeepAliveReporter(object):
135 Singleton object which reports test start to parent process
140 self.__dict__ = self._shared_state
147 def pipe(self, pipe):
148 if hasattr(self, '_pipe'):
149 raise Exception("Internal error - pipe should only be set once.")
152 def send_keep_alive(self, test):
154 Write current test tmpdir & desc to keep-alive pipe to signal liveness
156 if self.pipe is None:
157 # if not running forked..
163 desc = test.shortDescription()
167 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
170 class VppTestCase(unittest.TestCase):
171 """This subclass is a base class for VPP test cases that are implemented as
172 classes. It provides methods to create and run test case.
176 def packet_infos(self):
177 """List of packet infos"""
178 return self._packet_infos
181 def get_packet_count_for_if_idx(cls, dst_if_index):
182 """Get the number of packet info for specified destination if index"""
183 if dst_if_index in cls._packet_count_for_dst_if_idx:
184 return cls._packet_count_for_dst_if_idx[dst_if_index]
190 """Return the instance of this testcase"""
191 return cls.test_instance
194 def set_debug_flags(cls, d):
195 cls.debug_core = False
196 cls.debug_gdb = False
197 cls.debug_gdbserver = False
202 cls.debug_core = True
205 elif dl == "gdbserver":
206 cls.debug_gdbserver = True
208 raise Exception("Unrecognized DEBUG option: '%s'" % d)
211 def setUpConstants(cls):
212 """ Set-up the test case class based on environment variables """
214 s = os.getenv("STEP")
215 cls.step = True if s.lower() in ("y", "yes", "1") else False
219 d = os.getenv("DEBUG")
223 c = os.getenv("CACHE_OUTPUT", "1")
224 cls.cache_vpp_output = \
225 False if c.lower() in ("n", "no", "0") else True
227 cls.cache_vpp_output = True
228 cls.set_debug_flags(d)
229 cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
230 cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
231 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
233 if cls.plugin_path is not None:
234 if cls.extern_plugin_path is not None:
235 plugin_path = "%s:%s" % (
236 cls.plugin_path, cls.extern_plugin_path)
238 plugin_path = cls.plugin_path
239 elif cls.extern_plugin_path is not None:
240 plugin_path = cls.extern_plugin_path
242 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
243 debug_cli = "cli-listen localhost:5002"
246 size = os.getenv("COREDUMP_SIZE")
248 coredump_size = "coredump-size %s" % size
251 if coredump_size is None:
252 coredump_size = "coredump-size unlimited"
253 cls.vpp_cmdline = [cls.vpp_bin, "unix",
254 "{", "nodaemon", debug_cli, "full-coredump",
255 coredump_size, "}", "api-trace", "{", "on", "}",
256 "api-segment", "{", "prefix", cls.shm_prefix, "}",
257 "plugins", "{", "plugin", "dpdk_plugin.so", "{",
259 if plugin_path is not None:
260 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
261 cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
264 def wait_for_enter(cls):
265 if cls.debug_gdbserver:
266 print(double_line_delim)
267 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
269 print(double_line_delim)
270 print("Spawned VPP with PID: %d" % cls.vpp.pid)
272 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
274 print(single_line_delim)
275 print("You can debug the VPP using e.g.:")
276 if cls.debug_gdbserver:
277 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
278 print("Now is the time to attach a gdb by running the above "
279 "command, set up breakpoints etc. and then resume VPP from "
280 "within gdb by issuing the 'continue' command")
282 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
283 print("Now is the time to attach a gdb by running the above "
284 "command and set up breakpoints etc.")
285 print(single_line_delim)
286 raw_input("Press ENTER to continue running the testcase...")
290 cmdline = cls.vpp_cmdline
292 if cls.debug_gdbserver:
293 gdbserver = '/usr/bin/gdbserver'
294 if not os.path.isfile(gdbserver) or \
295 not os.access(gdbserver, os.X_OK):
296 raise Exception("gdbserver binary '%s' does not exist or is "
297 "not executable" % gdbserver)
299 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
300 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
303 cls.vpp = subprocess.Popen(cmdline,
304 stdout=subprocess.PIPE,
305 stderr=subprocess.PIPE,
307 except Exception as e:
308 cls.logger.critical("Couldn't start vpp: %s" % e)
316 Perform class setup before running the testcase
317 Remove shared memory files, start vpp and connect the vpp-api
319 gc.collect() # run garbage collection first
321 cls.logger = getLogger(cls.__name__)
322 cls.tempdir = tempfile.mkdtemp(
323 prefix='vpp-unittest-%s-' % cls.__name__)
324 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
325 cls.file_handler.setFormatter(
326 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
328 cls.file_handler.setLevel(DEBUG)
329 cls.logger.addHandler(cls.file_handler)
330 cls.shm_prefix = cls.tempdir.split("/")[-1]
331 os.chdir(cls.tempdir)
332 cls.logger.info("Temporary dir is %s, shm prefix is %s",
333 cls.tempdir, cls.shm_prefix)
335 cls.reset_packet_infos()
337 cls._zombie_captures = []
340 cls.registry = VppObjectRegistry()
341 cls.vpp_startup_failed = False
342 cls.reporter = KeepAliveReporter()
343 # need to catch exceptions here because if we raise, then the cleanup
344 # doesn't get called and we might end with a zombie vpp
347 cls.reporter.send_keep_alive(cls)
348 cls.vpp_stdout_deque = deque()
349 cls.vpp_stderr_deque = deque()
350 cls.pump_thread_stop_flag = Event()
351 cls.pump_thread_wakeup_pipe = os.pipe()
352 cls.pump_thread = Thread(target=pump_output, args=(cls,))
353 cls.pump_thread.daemon = True
354 cls.pump_thread.start()
355 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls)
360 cls.vapi.register_hook(hook)
361 cls.sleep(0.1, "after vpp startup, before initial poll")
365 cls.vpp_startup_failed = True
367 "VPP died shortly after startup, check the"
368 " output to standard error for possible cause")
373 if cls.debug_gdbserver:
374 print(colorize("You're running VPP inside gdbserver but "
375 "VPP-API connection failed, did you forget "
376 "to 'continue' VPP from within gdb?", RED))
379 t, v, tb = sys.exc_info()
389 Disconnect vpp-api, kill vpp and cleanup shared memory files
391 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
393 if cls.vpp.returncode is None:
394 print(double_line_delim)
395 print("VPP or GDB server is still running")
396 print(single_line_delim)
397 raw_input("When done debugging, press ENTER to kill the "
398 "process and finish running the testcase...")
400 os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
401 cls.pump_thread_stop_flag.set()
402 if hasattr(cls, 'pump_thread'):
403 cls.logger.debug("Waiting for pump thread to stop")
404 cls.pump_thread.join()
405 if hasattr(cls, 'vpp_stderr_reader_thread'):
406 cls.logger.debug("Waiting for stdderr pump to stop")
407 cls.vpp_stderr_reader_thread.join()
409 if hasattr(cls, 'vpp'):
410 if hasattr(cls, 'vapi'):
411 cls.vapi.disconnect()
414 if cls.vpp.returncode is None:
415 cls.logger.debug("Sending TERM to vpp")
417 cls.logger.debug("Waiting for vpp to die")
418 cls.vpp.communicate()
421 if cls.vpp_startup_failed:
422 stdout_log = cls.logger.info
423 stderr_log = cls.logger.critical
425 stdout_log = cls.logger.info
426 stderr_log = cls.logger.info
428 if hasattr(cls, 'vpp_stdout_deque'):
429 stdout_log(single_line_delim)
430 stdout_log('VPP output to stdout while running %s:', cls.__name__)
431 stdout_log(single_line_delim)
432 vpp_output = "".join(cls.vpp_stdout_deque)
433 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
435 stdout_log('\n%s', vpp_output)
436 stdout_log(single_line_delim)
438 if hasattr(cls, 'vpp_stderr_deque'):
439 stderr_log(single_line_delim)
440 stderr_log('VPP output to stderr while running %s:', cls.__name__)
441 stderr_log(single_line_delim)
442 vpp_output = "".join(cls.vpp_stderr_deque)
443 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
445 stderr_log('\n%s', vpp_output)
446 stderr_log(single_line_delim)
449 def tearDownClass(cls):
450 """ Perform final cleanup after running all tests in this test-case """
452 cls.file_handler.close()
455 """ Show various debug prints after each test """
456 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
457 (self.__class__.__name__, self._testMethodName,
458 self._testMethodDoc))
459 if not self.vpp_dead:
460 self.logger.debug(self.vapi.cli("show trace"))
461 self.logger.info(self.vapi.ppcli("show interface"))
462 self.logger.info(self.vapi.ppcli("show hardware"))
463 self.logger.info(self.vapi.ppcli("show error"))
464 self.logger.info(self.vapi.ppcli("show run"))
465 self.registry.remove_vpp_config(self.logger)
466 # Save/Dump VPP api trace log
467 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
468 tmp_api_trace = "/tmp/%s" % api_trace
469 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
470 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
471 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
473 os.rename(tmp_api_trace, vpp_api_trace_log)
474 self.logger.info(self.vapi.ppcli("api trace dump %s" %
477 self.registry.unregister_all(self.logger)
480 """ Clear trace before running each test"""
481 self.reporter.send_keep_alive(self)
482 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
483 (self.__class__.__name__, self._testMethodName,
484 self._testMethodDoc))
486 raise Exception("VPP is dead when setting up the test")
487 self.sleep(.1, "during setUp")
488 self.vpp_stdout_deque.append(
489 "--- test setUp() for %s.%s(%s) starts here ---\n" %
490 (self.__class__.__name__, self._testMethodName,
491 self._testMethodDoc))
492 self.vpp_stderr_deque.append(
493 "--- test setUp() for %s.%s(%s) starts here ---\n" %
494 (self.__class__.__name__, self._testMethodName,
495 self._testMethodDoc))
496 self.vapi.cli("clear trace")
497 # store the test instance inside the test class - so that objects
498 # holding the class can access instance methods (like assertEqual)
499 type(self).test_instance = self
502 def pg_enable_capture(cls, interfaces):
504 Enable capture on packet-generator interfaces
506 :param interfaces: iterable interface indexes
513 def register_capture(cls, cap_name):
514 """ Register a capture in the testclass """
515 # add to the list of captures with current timestamp
516 cls._captures.append((time.time(), cap_name))
517 # filter out from zombies
518 cls._zombie_captures = [(stamp, name)
519 for (stamp, name) in cls._zombie_captures
524 """ Remove any zombie captures and enable the packet generator """
525 # how long before capture is allowed to be deleted - otherwise vpp
526 # crashes - 100ms seems enough (this shouldn't be needed at all)
529 for stamp, cap_name in cls._zombie_captures:
530 wait = stamp + capture_ttl - now
532 cls.sleep(wait, "before deleting capture %s" % cap_name)
534 cls.logger.debug("Removing zombie capture %s" % cap_name)
535 cls.vapi.cli('packet-generator delete %s' % cap_name)
537 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
538 cls.vapi.cli('packet-generator enable')
539 cls._zombie_captures = cls._captures
543 def create_pg_interfaces(cls, interfaces):
545 Create packet-generator interfaces.
547 :param interfaces: iterable indexes of the interfaces.
548 :returns: List of created interfaces.
553 intf = VppPGInterface(cls, i)
554 setattr(cls, intf.name, intf)
556 cls.pg_interfaces = result
560 def create_loopback_interfaces(cls, interfaces):
562 Create loopback interfaces.
564 :param interfaces: iterable indexes of the interfaces.
565 :returns: List of created interfaces.
569 intf = VppLoInterface(cls, i)
570 setattr(cls, intf.name, intf)
572 cls.lo_interfaces = result
576 def extend_packet(packet, size):
578 Extend packet to given size by padding with spaces
579 NOTE: Currently works only when Raw layer is present.
581 :param packet: packet
582 :param size: target size
585 packet_len = len(packet) + 4
586 extend = size - packet_len
588 packet[Raw].load += ' ' * extend
591 def reset_packet_infos(cls):
592 """ Reset the list of packet info objects and packet counts to zero """
593 cls._packet_infos = {}
594 cls._packet_count_for_dst_if_idx = {}
597 def create_packet_info(cls, src_if, dst_if):
599 Create packet info object containing the source and destination indexes
600 and add it to the testcase's packet info list
602 :param VppInterface src_if: source interface
603 :param VppInterface dst_if: destination interface
605 :returns: _PacketInfo object
609 info.index = len(cls._packet_infos)
610 info.src = src_if.sw_if_index
611 info.dst = dst_if.sw_if_index
612 if isinstance(dst_if, VppSubInterface):
613 dst_idx = dst_if.parent.sw_if_index
615 dst_idx = dst_if.sw_if_index
616 if dst_idx in cls._packet_count_for_dst_if_idx:
617 cls._packet_count_for_dst_if_idx[dst_idx] += 1
619 cls._packet_count_for_dst_if_idx[dst_idx] = 1
620 cls._packet_infos[info.index] = info
624 def info_to_payload(info):
626 Convert _PacketInfo object to packet payload
628 :param info: _PacketInfo object
630 :returns: string containing serialized data from packet info
632 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
636 def payload_to_info(payload):
638 Convert packet payload to _PacketInfo object
640 :param payload: packet payload
642 :returns: _PacketInfo object containing de-serialized data from payload
645 numbers = payload.split()
647 info.index = int(numbers[0])
648 info.src = int(numbers[1])
649 info.dst = int(numbers[2])
650 info.ip = int(numbers[3])
651 info.proto = int(numbers[4])
654 def get_next_packet_info(self, info):
656 Iterate over the packet info list stored in the testcase
657 Start iteration with first element if info is None
658 Continue based on index in info if info is specified
660 :param info: info or None
661 :returns: next info in list or None if no more infos
666 next_index = info.index + 1
667 if next_index == len(self._packet_infos):
670 return self._packet_infos[next_index]
672 def get_next_packet_info_for_interface(self, src_index, info):
674 Search the packet info list for the next packet info with same source
677 :param src_index: source interface index to search for
678 :param info: packet info - where to start the search
679 :returns: packet info or None
683 info = self.get_next_packet_info(info)
686 if info.src == src_index:
689 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
691 Search the packet info list for the next packet info with same source
692 and destination interface indexes
694 :param src_index: source interface index to search for
695 :param dst_index: destination interface index to search for
696 :param info: packet info - where to start the search
697 :returns: packet info or None
701 info = self.get_next_packet_info_for_interface(src_index, info)
704 if info.dst == dst_index:
707 def assert_equal(self, real_value, expected_value, name_or_class=None):
708 if name_or_class is None:
709 self.assertEqual(real_value, expected_value)
712 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
713 msg = msg % (getdoc(name_or_class).strip(),
714 real_value, str(name_or_class(real_value)),
715 expected_value, str(name_or_class(expected_value)))
717 msg = "Invalid %s: %s does not match expected value %s" % (
718 name_or_class, real_value, expected_value)
720 self.assertEqual(real_value, expected_value, msg)
722 def assert_in_range(self,
730 msg = "Invalid %s: %s out of range <%s,%s>" % (
731 name, real_value, expected_min, expected_max)
732 self.assertTrue(expected_min <= real_value <= expected_max, msg)
735 def sleep(cls, timeout, remark=None):
736 if hasattr(cls, 'logger'):
737 cls.logger.debug("Starting sleep for %ss (%s)" % (timeout, remark))
741 if after - before > 2 * timeout:
742 cls.logger.error("unexpected time.sleep() result - "
743 "slept for %ss instead of ~%ss!" % (
744 after - before, timeout))
745 if hasattr(cls, 'logger'):
747 "Finished sleep (%s) - slept %ss (wanted %ss)" % (
748 remark, after - before, timeout))
750 def send_and_assert_no_replies(self, intf, pkts, remark=""):
751 self.vapi.cli("clear trace")
752 intf.add_stream(pkts)
753 self.pg_enable_capture(self.pg_interfaces)
756 for i in self.pg_interfaces:
757 i.get_capture(0, timeout=timeout)
758 i.assert_nothing_captured(remark=remark)
761 def send_and_expect(self, input, pkts, output):
762 self.vapi.cli("clear trace")
763 input.add_stream(pkts)
764 self.pg_enable_capture(self.pg_interfaces)
766 rx = output.get_capture(len(pkts))
770 class TestCasePrinter(object):
774 self.__dict__ = self._shared_state
775 if not hasattr(self, "_test_case_set"):
776 self._test_case_set = set()
778 def print_test_case_heading_if_first_time(self, case):
779 if case.__class__ not in self._test_case_set:
780 print(double_line_delim)
781 print(colorize(getdoc(case.__class__).splitlines()[0], YELLOW))
782 print(double_line_delim)
783 self._test_case_set.add(case.__class__)
786 class VppTestResult(unittest.TestResult):
788 @property result_string
789 String variable to store the test case result string.
791 List variable containing 2-tuples of TestCase instances and strings
792 holding formatted tracebacks. Each tuple represents a test which
793 raised an unexpected exception.
795 List variable containing 2-tuples of TestCase instances and strings
796 holding formatted tracebacks. Each tuple represents a test where
797 a failure was explicitly signalled using the TestCase.assert*()
801 def __init__(self, stream, descriptions, verbosity):
803 :param stream File descriptor to store where to report test results.
804 Set to the standard error stream by default.
805 :param descriptions Boolean variable to store information if to use
806 test case descriptions.
807 :param verbosity Integer variable to store required verbosity level.
809 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
811 self.descriptions = descriptions
812 self.verbosity = verbosity
813 self.result_string = None
814 self.printer = TestCasePrinter()
816 def addSuccess(self, test):
818 Record a test succeeded result
823 if hasattr(test, 'logger'):
824 test.logger.debug("--- addSuccess() %s.%s(%s) called"
825 % (test.__class__.__name__,
826 test._testMethodName,
827 test._testMethodDoc))
828 unittest.TestResult.addSuccess(self, test)
829 self.result_string = colorize("OK", GREEN)
831 def addSkip(self, test, reason):
833 Record a test skipped.
839 if hasattr(test, 'logger'):
840 test.logger.debug("--- addSkip() %s.%s(%s) called, reason is %s"
841 % (test.__class__.__name__,
842 test._testMethodName,
845 unittest.TestResult.addSkip(self, test, reason)
846 self.result_string = colorize("SKIP", YELLOW)
848 def symlink_failed(self, test):
850 if hasattr(test, 'logger'):
852 if hasattr(test, 'tempdir'):
854 failed_dir = os.getenv('VPP_TEST_FAILED_DIR')
855 link_path = '%s/%s-FAILED' % (failed_dir,
856 test.tempdir.split("/")[-1])
858 logger.debug("creating a link to the failed test")
859 logger.debug("os.symlink(%s, %s)" %
860 (test.tempdir, link_path))
861 os.symlink(test.tempdir, link_path)
862 except Exception as e:
866 def send_failure_through_pipe(self, test):
867 if hasattr(self, 'test_framework_failed_pipe'):
868 pipe = self.test_framework_failed_pipe
870 pipe.send(test.__class__)
872 def addFailure(self, test, err):
874 Record a test failed result
877 :param err: error message
880 if hasattr(test, 'logger'):
881 test.logger.debug("--- addFailure() %s.%s(%s) called, err is %s"
882 % (test.__class__.__name__,
883 test._testMethodName,
884 test._testMethodDoc, err))
885 test.logger.debug("formatted exception is:\n%s" %
886 "".join(format_exception(*err)))
887 unittest.TestResult.addFailure(self, test, err)
888 if hasattr(test, 'tempdir'):
889 self.result_string = colorize("FAIL", RED) + \
890 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
891 self.symlink_failed(test)
893 self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
895 self.send_failure_through_pipe(test)
897 def addError(self, test, err):
899 Record a test error result
902 :param err: error message
905 if hasattr(test, 'logger'):
906 test.logger.debug("--- addError() %s.%s(%s) called, err is %s"
907 % (test.__class__.__name__,
908 test._testMethodName,
909 test._testMethodDoc, err))
910 test.logger.debug("formatted exception is:\n%s" %
911 "".join(format_exception(*err)))
912 unittest.TestResult.addError(self, test, err)
913 if hasattr(test, 'tempdir'):
914 self.result_string = colorize("ERROR", RED) + \
915 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
916 self.symlink_failed(test)
918 self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
920 self.send_failure_through_pipe(test)
922 def getDescription(self, test):
927 :returns: test description
930 # TODO: if none print warning not raise exception
931 short_description = test.shortDescription()
932 if self.descriptions and short_description:
933 return short_description
937 def startTest(self, test):
944 self.printer.print_test_case_heading_if_first_time(test)
945 unittest.TestResult.startTest(self, test)
946 if self.verbosity > 0:
948 "Starting " + self.getDescription(test) + " ...")
949 self.stream.writeln(single_line_delim)
951 def stopTest(self, test):
958 unittest.TestResult.stopTest(self, test)
959 if self.verbosity > 0:
960 self.stream.writeln(single_line_delim)
961 self.stream.writeln("%-73s%s" % (self.getDescription(test),
963 self.stream.writeln(single_line_delim)
965 self.stream.writeln("%-73s%s" % (self.getDescription(test),
968 def printErrors(self):
970 Print errors from running the test case
972 self.stream.writeln()
973 self.printErrorList('ERROR', self.errors)
974 self.printErrorList('FAIL', self.failures)
976 def printErrorList(self, flavour, errors):
978 Print error list to the output stream together with error type
979 and test case description.
981 :param flavour: error type
982 :param errors: iterable errors
985 for test, err in errors:
986 self.stream.writeln(double_line_delim)
987 self.stream.writeln("%s: %s" %
988 (flavour, self.getDescription(test)))
989 self.stream.writeln(single_line_delim)
990 self.stream.writeln("%s" % err)
993 class Filter_by_test_option:
994 def __init__(self, filter_file_name, filter_class_name, filter_func_name):
995 self.filter_file_name = filter_file_name
996 self.filter_class_name = filter_class_name
997 self.filter_func_name = filter_func_name
999 def __call__(self, file_name, class_name, func_name):
1000 if self.filter_file_name and file_name != self.filter_file_name:
1002 if self.filter_class_name and class_name != self.filter_class_name:
1004 if self.filter_func_name and func_name != self.filter_func_name:
1009 class VppTestRunner(unittest.TextTestRunner):
1011 A basic test runner implementation which prints results to standard error.
1014 def resultclass(self):
1015 """Class maintaining the results of the tests"""
1016 return VppTestResult
1018 def __init__(self, keep_alive_pipe=None, failed_pipe=None,
1019 stream=sys.stderr, descriptions=True,
1020 verbosity=1, failfast=False, buffer=False, resultclass=None):
1021 # ignore stream setting here, use hard-coded stdout to be in sync
1022 # with prints from VppTestCase methods ...
1023 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1024 verbosity, failfast, buffer,
1026 reporter = KeepAliveReporter()
1027 reporter.pipe = keep_alive_pipe
1028 # this is super-ugly, but very simple to implement and works as long
1029 # as we run only one test at the same time
1030 VppTestResult.test_framework_failed_pipe = failed_pipe
1032 test_option = "TEST"
1034 def parse_test_option(self):
1036 f = os.getenv(self.test_option)
1039 filter_file_name = None
1040 filter_class_name = None
1041 filter_func_name = None
1044 parts = f.split('.')
1046 raise Exception("Unrecognized %s option: %s" %
1047 (self.test_option, f))
1049 if parts[2] not in ('*', ''):
1050 filter_func_name = parts[2]
1051 if parts[1] not in ('*', ''):
1052 filter_class_name = parts[1]
1053 if parts[0] not in ('*', ''):
1054 if parts[0].startswith('test_'):
1055 filter_file_name = parts[0]
1057 filter_file_name = 'test_%s' % parts[0]
1059 if f.startswith('test_'):
1060 filter_file_name = f
1062 filter_file_name = 'test_%s' % f
1063 return filter_file_name, filter_class_name, filter_func_name
1066 def filter_tests(tests, filter_cb):
1067 result = unittest.suite.TestSuite()
1069 if isinstance(t, unittest.suite.TestSuite):
1070 # this is a bunch of tests, recursively filter...
1071 x = filter_tests(t, filter_cb)
1072 if x.countTestCases() > 0:
1074 elif isinstance(t, unittest.TestCase):
1075 # this is a single test
1076 parts = t.id().split('.')
1077 # t.id() for common cases like this:
1078 # test_classifier.TestClassifier.test_acl_ip
1079 # apply filtering only if it is so
1081 if not filter_cb(parts[0], parts[1], parts[2]):
1085 # unexpected object, don't touch it
1089 def run(self, test):
1096 faulthandler.enable() # emit stack trace to stderr if killed by signal
1097 print("Running tests using custom test runner") # debug message
1098 filter_file, filter_class, filter_func = self.parse_test_option()
1099 print("Active filters: file=%s, class=%s, function=%s" % (
1100 filter_file, filter_class, filter_func))
1101 filter_cb = Filter_by_test_option(
1102 filter_file, filter_class, filter_func)
1103 filtered = self.filter_tests(test, filter_cb)
1104 print("%s out of %s tests match specified filters" % (
1105 filtered.countTestCases(), test.countTestCases()))
1106 if not running_extended_tests():
1107 print("Not running extended tests (some tests will be skipped)")
1108 return super(VppTestRunner, self).run(filtered)
1111 class Worker(Thread):
1112 def __init__(self, args, logger):
1113 self.logger = logger
1116 super(Worker, self).__init__()
1119 executable = self.args[0]
1120 self.logger.debug("Running executable w/args `%s'" % self.args)
1121 env = os.environ.copy()
1122 env["CK_LOG_FILE_NAME"] = "-"
1123 self.process = subprocess.Popen(
1124 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1125 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1126 out, err = self.process.communicate()
1127 self.logger.debug("Finished running `%s'" % executable)
1128 self.logger.info("Return code is `%s'" % self.process.returncode)
1129 self.logger.info(single_line_delim)
1130 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1131 self.logger.info(single_line_delim)
1132 self.logger.info(out)
1133 self.logger.info(single_line_delim)
1134 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1135 self.logger.info(single_line_delim)
1136 self.logger.error(err)
1137 self.logger.info(single_line_delim)
1138 self.result = self.process.returncode