3 from __future__ import print_function
14 from collections import deque
15 from threading import Thread, Event
16 from inspect import getdoc, isclass
17 from traceback import format_exception
18 from logging import FileHandler, DEBUG, Formatter
19 from scapy.packet import Raw
20 from hook import StepHook, PollHook, VppDiedError
21 from vpp_pg_interface import VppPGInterface
22 from vpp_sub_interface import VppSubInterface
23 from vpp_lo_interface import VppLoInterface
24 from vpp_papi_provider import VppPapiProvider
25 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
27 from vpp_object import VppObjectRegistry
28 if os.name == 'posix' and sys.version_info[0] < 3:
29 # using subprocess32 is recommended by python official documentation
30 # @ https://docs.python.org/2/library/subprocess.html
31 import subprocess32 as subprocess
35 debug_framework = False
36 if os.getenv('TEST_DEBUG', "0") == "1":
37 debug_framework = True
42 Test framework module.
44 The module provides a set of tools for constructing and running tests and
45 representing the results.
49 class _PacketInfo(object):
50 """Private class to create packet info object.
52 Help process information about the next packet.
53 Set variables to default values.
55 #: Store the index of the packet.
57 #: Store the index of the source packet generator interface of the packet.
59 #: Store the index of the destination packet generator interface
62 #: Store expected ip version
64 #: Store expected upper protocol
66 #: Store the copy of the former packet.
69 def __eq__(self, other):
70 index = self.index == other.index
71 src = self.src == other.src
72 dst = self.dst == other.dst
73 data = self.data == other.data
74 return index and src and dst and data
77 def pump_output(testclass):
78 """ pump output from vpp stdout/stderr to proper queues """
81 while not testclass.pump_thread_stop_flag.wait(0):
82 readable = select.select([testclass.vpp.stdout.fileno(),
83 testclass.vpp.stderr.fileno(),
84 testclass.pump_thread_wakeup_pipe[0]],
86 if testclass.vpp.stdout.fileno() in readable:
87 read = os.read(testclass.vpp.stdout.fileno(), 102400)
89 split = read.splitlines(True)
90 if len(stdout_fragment) > 0:
91 split[0] = "%s%s" % (stdout_fragment, split[0])
92 if len(split) > 0 and split[-1].endswith("\n"):
96 stdout_fragment = split[-1]
97 testclass.vpp_stdout_deque.extend(split[:limit])
98 if not testclass.cache_vpp_output:
99 for line in split[:limit]:
100 testclass.logger.debug(
101 "VPP STDOUT: %s" % line.rstrip("\n"))
102 if testclass.vpp.stderr.fileno() in readable:
103 read = os.read(testclass.vpp.stderr.fileno(), 102400)
105 split = read.splitlines(True)
106 if len(stderr_fragment) > 0:
107 split[0] = "%s%s" % (stderr_fragment, split[0])
108 if len(split) > 0 and split[-1].endswith("\n"):
112 stderr_fragment = split[-1]
113 testclass.vpp_stderr_deque.extend(split[:limit])
114 if not testclass.cache_vpp_output:
115 for line in split[:limit]:
116 testclass.logger.debug(
117 "VPP STDERR: %s" % line.rstrip("\n"))
118 # ignoring the dummy pipe here intentionally - the flag will take care
119 # of properly terminating the loop
122 def running_extended_tests():
123 s = os.getenv("EXTENDED_TESTS", "n")
124 return True if s.lower() in ("y", "yes", "1") else False
127 def running_on_centos():
128 os_id = os.getenv("OS_ID", "")
129 return True if "centos" in os_id.lower() else False
132 class KeepAliveReporter(object):
134 Singleton object which reports test start to parent process
139 self.__dict__ = self._shared_state
146 def pipe(self, pipe):
147 if hasattr(self, '_pipe'):
148 raise Exception("Internal error - pipe should only be set once.")
151 def send_keep_alive(self, test):
153 Write current test tmpdir & desc to keep-alive pipe to signal liveness
155 if self.pipe is None:
156 # if not running forked..
162 desc = test.shortDescription()
166 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
169 class VppTestCase(unittest.TestCase):
170 """This subclass is a base class for VPP test cases that are implemented as
171 classes. It provides methods to create and run test case.
175 def packet_infos(self):
176 """List of packet infos"""
177 return self._packet_infos
180 def get_packet_count_for_if_idx(cls, dst_if_index):
181 """Get the number of packet info for specified destination if index"""
182 if dst_if_index in cls._packet_count_for_dst_if_idx:
183 return cls._packet_count_for_dst_if_idx[dst_if_index]
189 """Return the instance of this testcase"""
190 return cls.test_instance
193 def set_debug_flags(cls, d):
194 cls.debug_core = False
195 cls.debug_gdb = False
196 cls.debug_gdbserver = False
201 cls.debug_core = True
204 elif dl == "gdbserver":
205 cls.debug_gdbserver = True
207 raise Exception("Unrecognized DEBUG option: '%s'" % d)
210 def setUpConstants(cls):
211 """ Set-up the test case class based on environment variables """
212 s = os.getenv("STEP", "n")
213 cls.step = True if s.lower() in ("y", "yes", "1") else False
214 d = os.getenv("DEBUG", None)
215 c = os.getenv("CACHE_OUTPUT", "1")
216 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
217 cls.set_debug_flags(d)
218 cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
219 cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
220 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
222 if cls.plugin_path is not None:
223 if cls.extern_plugin_path is not None:
224 plugin_path = "%s:%s" % (
225 cls.plugin_path, cls.extern_plugin_path)
227 plugin_path = cls.plugin_path
228 elif cls.extern_plugin_path is not None:
229 plugin_path = cls.extern_plugin_path
231 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
232 debug_cli = "cli-listen localhost:5002"
234 size = os.getenv("COREDUMP_SIZE")
236 coredump_size = "coredump-size %s" % size
237 if coredump_size is None:
238 coredump_size = "coredump-size unlimited"
239 cls.vpp_cmdline = [cls.vpp_bin, "unix",
240 "{", "nodaemon", debug_cli, "full-coredump",
241 coredump_size, "}", "api-trace", "{", "on", "}",
242 "api-segment", "{", "prefix", cls.shm_prefix, "}",
243 "plugins", "{", "plugin", "dpdk_plugin.so", "{",
244 "disable", "}", "}", ]
245 if plugin_path is not None:
246 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
247 cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
250 def wait_for_enter(cls):
251 if cls.debug_gdbserver:
252 print(double_line_delim)
253 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
255 print(double_line_delim)
256 print("Spawned VPP with PID: %d" % cls.vpp.pid)
258 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
260 print(single_line_delim)
261 print("You can debug the VPP using e.g.:")
262 if cls.debug_gdbserver:
263 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
264 print("Now is the time to attach a gdb by running the above "
265 "command, set up breakpoints etc. and then resume VPP from "
266 "within gdb by issuing the 'continue' command")
268 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
269 print("Now is the time to attach a gdb by running the above "
270 "command and set up breakpoints etc.")
271 print(single_line_delim)
272 raw_input("Press ENTER to continue running the testcase...")
276 cmdline = cls.vpp_cmdline
278 if cls.debug_gdbserver:
279 gdbserver = '/usr/bin/gdbserver'
280 if not os.path.isfile(gdbserver) or \
281 not os.access(gdbserver, os.X_OK):
282 raise Exception("gdbserver binary '%s' does not exist or is "
283 "not executable" % gdbserver)
285 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
286 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
289 cls.vpp = subprocess.Popen(cmdline,
290 stdout=subprocess.PIPE,
291 stderr=subprocess.PIPE,
293 except Exception as e:
294 cls.logger.critical("Couldn't start vpp: %s" % e)
302 Perform class setup before running the testcase
303 Remove shared memory files, start vpp and connect the vpp-api
305 gc.collect() # run garbage collection first
307 cls.logger = getLogger(cls.__name__)
308 cls.tempdir = tempfile.mkdtemp(
309 prefix='vpp-unittest-%s-' % cls.__name__)
310 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
311 cls.file_handler.setFormatter(
312 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
314 cls.file_handler.setLevel(DEBUG)
315 cls.logger.addHandler(cls.file_handler)
316 cls.shm_prefix = cls.tempdir.split("/")[-1]
317 os.chdir(cls.tempdir)
318 cls.logger.info("Temporary dir is %s, shm prefix is %s",
319 cls.tempdir, cls.shm_prefix)
321 cls.reset_packet_infos()
323 cls._zombie_captures = []
326 cls.registry = VppObjectRegistry()
327 cls.vpp_startup_failed = False
328 cls.reporter = KeepAliveReporter()
329 # need to catch exceptions here because if we raise, then the cleanup
330 # doesn't get called and we might end with a zombie vpp
333 cls.reporter.send_keep_alive(cls)
334 cls.vpp_stdout_deque = deque()
335 cls.vpp_stderr_deque = deque()
336 cls.pump_thread_stop_flag = Event()
337 cls.pump_thread_wakeup_pipe = os.pipe()
338 cls.pump_thread = Thread(target=pump_output, args=(cls,))
339 cls.pump_thread.daemon = True
340 cls.pump_thread.start()
341 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls)
346 cls.vapi.register_hook(hook)
347 cls.sleep(0.1, "after vpp startup, before initial poll")
351 cls.vpp_startup_failed = True
353 "VPP died shortly after startup, check the"
354 " output to standard error for possible cause")
360 cls.vapi.disconnect()
363 if cls.debug_gdbserver:
364 print(colorize("You're running VPP inside gdbserver but "
365 "VPP-API connection failed, did you forget "
366 "to 'continue' VPP from within gdb?", RED))
378 Disconnect vpp-api, kill vpp and cleanup shared memory files
380 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
382 if cls.vpp.returncode is None:
383 print(double_line_delim)
384 print("VPP or GDB server is still running")
385 print(single_line_delim)
386 raw_input("When done debugging, press ENTER to kill the "
387 "process and finish running the testcase...")
389 os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
390 cls.pump_thread_stop_flag.set()
391 if hasattr(cls, 'pump_thread'):
392 cls.logger.debug("Waiting for pump thread to stop")
393 cls.pump_thread.join()
394 if hasattr(cls, 'vpp_stderr_reader_thread'):
395 cls.logger.debug("Waiting for stdderr pump to stop")
396 cls.vpp_stderr_reader_thread.join()
398 if hasattr(cls, 'vpp'):
399 if hasattr(cls, 'vapi'):
400 cls.vapi.disconnect()
403 if cls.vpp.returncode is None:
404 cls.logger.debug("Sending TERM to vpp")
406 cls.logger.debug("Waiting for vpp to die")
407 cls.vpp.communicate()
410 if cls.vpp_startup_failed:
411 stdout_log = cls.logger.info
412 stderr_log = cls.logger.critical
414 stdout_log = cls.logger.info
415 stderr_log = cls.logger.info
417 if hasattr(cls, 'vpp_stdout_deque'):
418 stdout_log(single_line_delim)
419 stdout_log('VPP output to stdout while running %s:', cls.__name__)
420 stdout_log(single_line_delim)
421 vpp_output = "".join(cls.vpp_stdout_deque)
422 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
424 stdout_log('\n%s', vpp_output)
425 stdout_log(single_line_delim)
427 if hasattr(cls, 'vpp_stderr_deque'):
428 stderr_log(single_line_delim)
429 stderr_log('VPP output to stderr while running %s:', cls.__name__)
430 stderr_log(single_line_delim)
431 vpp_output = "".join(cls.vpp_stderr_deque)
432 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
434 stderr_log('\n%s', vpp_output)
435 stderr_log(single_line_delim)
438 def tearDownClass(cls):
439 """ Perform final cleanup after running all tests in this test-case """
441 cls.file_handler.close()
442 cls.reset_packet_infos()
444 debug_internal.on_tear_down_class(cls)
447 """ Show various debug prints after each test """
448 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
449 (self.__class__.__name__, self._testMethodName,
450 self._testMethodDoc))
451 if not self.vpp_dead:
452 self.logger.debug(self.vapi.cli("show trace"))
453 self.logger.info(self.vapi.ppcli("show interface"))
454 self.logger.info(self.vapi.ppcli("show hardware"))
455 self.logger.info(self.vapi.ppcli("show error"))
456 self.logger.info(self.vapi.ppcli("show run"))
457 self.registry.remove_vpp_config(self.logger)
458 # Save/Dump VPP api trace log
459 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
460 tmp_api_trace = "/tmp/%s" % api_trace
461 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
462 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
463 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
465 os.rename(tmp_api_trace, vpp_api_trace_log)
466 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
469 self.registry.unregister_all(self.logger)
472 """ Clear trace before running each test"""
473 self.reporter.send_keep_alive(self)
474 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
475 (self.__class__.__name__, self._testMethodName,
476 self._testMethodDoc))
478 raise Exception("VPP is dead when setting up the test")
479 self.sleep(.1, "during setUp")
480 self.vpp_stdout_deque.append(
481 "--- test setUp() for %s.%s(%s) starts here ---\n" %
482 (self.__class__.__name__, self._testMethodName,
483 self._testMethodDoc))
484 self.vpp_stderr_deque.append(
485 "--- test setUp() for %s.%s(%s) starts here ---\n" %
486 (self.__class__.__name__, self._testMethodName,
487 self._testMethodDoc))
488 self.vapi.cli("clear trace")
489 # store the test instance inside the test class - so that objects
490 # holding the class can access instance methods (like assertEqual)
491 type(self).test_instance = self
494 def pg_enable_capture(cls, interfaces=None):
496 Enable capture on packet-generator interfaces
498 :param interfaces: iterable interface indexes (if None,
499 use self.pg_interfaces)
502 if interfaces is None:
503 interfaces = cls.pg_interfaces
508 def register_capture(cls, cap_name):
509 """ Register a capture in the testclass """
510 # add to the list of captures with current timestamp
511 cls._captures.append((time.time(), cap_name))
512 # filter out from zombies
513 cls._zombie_captures = [(stamp, name)
514 for (stamp, name) in cls._zombie_captures
519 """ Remove any zombie captures and enable the packet generator """
520 # how long before capture is allowed to be deleted - otherwise vpp
521 # crashes - 100ms seems enough (this shouldn't be needed at all)
524 for stamp, cap_name in cls._zombie_captures:
525 wait = stamp + capture_ttl - now
527 cls.sleep(wait, "before deleting capture %s" % cap_name)
529 cls.logger.debug("Removing zombie capture %s" % cap_name)
530 cls.vapi.cli('packet-generator delete %s' % cap_name)
532 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
533 cls.vapi.cli('packet-generator enable')
534 cls._zombie_captures = cls._captures
538 def create_pg_interfaces(cls, interfaces):
540 Create packet-generator interfaces.
542 :param interfaces: iterable indexes of the interfaces.
543 :returns: List of created interfaces.
548 intf = VppPGInterface(cls, i)
549 setattr(cls, intf.name, intf)
551 cls.pg_interfaces = result
555 def create_loopback_interfaces(cls, interfaces):
557 Create loopback interfaces.
559 :param interfaces: iterable indexes of the interfaces.
560 :returns: List of created interfaces.
564 intf = VppLoInterface(cls, i)
565 setattr(cls, intf.name, intf)
567 cls.lo_interfaces = result
571 def extend_packet(packet, size, padding=' '):
573 Extend packet to given size by padding with spaces or custom padding
574 NOTE: Currently works only when Raw layer is present.
576 :param packet: packet
577 :param size: target size
578 :param padding: padding used to extend the payload
581 packet_len = len(packet) + 4
582 extend = size - packet_len
584 num = (extend / len(padding)) + 1
585 packet[Raw].load += (padding * num)[:extend]
588 def reset_packet_infos(cls):
589 """ Reset the list of packet info objects and packet counts to zero """
590 cls._packet_infos = {}
591 cls._packet_count_for_dst_if_idx = {}
594 def create_packet_info(cls, src_if, dst_if):
596 Create packet info object containing the source and destination indexes
597 and add it to the testcase's packet info list
599 :param VppInterface src_if: source interface
600 :param VppInterface dst_if: destination interface
602 :returns: _PacketInfo object
606 info.index = len(cls._packet_infos)
607 info.src = src_if.sw_if_index
608 info.dst = dst_if.sw_if_index
609 if isinstance(dst_if, VppSubInterface):
610 dst_idx = dst_if.parent.sw_if_index
612 dst_idx = dst_if.sw_if_index
613 if dst_idx in cls._packet_count_for_dst_if_idx:
614 cls._packet_count_for_dst_if_idx[dst_idx] += 1
616 cls._packet_count_for_dst_if_idx[dst_idx] = 1
617 cls._packet_infos[info.index] = info
621 def info_to_payload(info):
623 Convert _PacketInfo object to packet payload
625 :param info: _PacketInfo object
627 :returns: string containing serialized data from packet info
629 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
633 def payload_to_info(payload):
635 Convert packet payload to _PacketInfo object
637 :param payload: packet payload
639 :returns: _PacketInfo object containing de-serialized data from payload
642 numbers = payload.split()
644 info.index = int(numbers[0])
645 info.src = int(numbers[1])
646 info.dst = int(numbers[2])
647 info.ip = int(numbers[3])
648 info.proto = int(numbers[4])
651 def get_next_packet_info(self, info):
653 Iterate over the packet info list stored in the testcase
654 Start iteration with first element if info is None
655 Continue based on index in info if info is specified
657 :param info: info or None
658 :returns: next info in list or None if no more infos
663 next_index = info.index + 1
664 if next_index == len(self._packet_infos):
667 return self._packet_infos[next_index]
669 def get_next_packet_info_for_interface(self, src_index, info):
671 Search the packet info list for the next packet info with same source
674 :param src_index: source interface index to search for
675 :param info: packet info - where to start the search
676 :returns: packet info or None
680 info = self.get_next_packet_info(info)
683 if info.src == src_index:
686 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
688 Search the packet info list for the next packet info with same source
689 and destination interface indexes
691 :param src_index: source interface index to search for
692 :param dst_index: destination interface index to search for
693 :param info: packet info - where to start the search
694 :returns: packet info or None
698 info = self.get_next_packet_info_for_interface(src_index, info)
701 if info.dst == dst_index:
704 def assert_equal(self, real_value, expected_value, name_or_class=None):
705 if name_or_class is None:
706 self.assertEqual(real_value, expected_value)
709 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
710 msg = msg % (getdoc(name_or_class).strip(),
711 real_value, str(name_or_class(real_value)),
712 expected_value, str(name_or_class(expected_value)))
714 msg = "Invalid %s: %s does not match expected value %s" % (
715 name_or_class, real_value, expected_value)
717 self.assertEqual(real_value, expected_value, msg)
719 def assert_in_range(self,
727 msg = "Invalid %s: %s out of range <%s,%s>" % (
728 name, real_value, expected_min, expected_max)
729 self.assertTrue(expected_min <= real_value <= expected_max, msg)
732 def sleep(cls, timeout, remark=None):
733 if hasattr(cls, 'logger'):
734 cls.logger.debug("Starting sleep for %ss (%s)" % (timeout, remark))
738 if after - before > 2 * timeout:
739 cls.logger.error("unexpected time.sleep() result - "
740 "slept for %ss instead of ~%ss!" % (
741 after - before, timeout))
742 if hasattr(cls, 'logger'):
744 "Finished sleep (%s) - slept %ss (wanted %ss)" % (
745 remark, after - before, timeout))
747 def send_and_assert_no_replies(self, intf, pkts, remark=""):
748 self.vapi.cli("clear trace")
749 intf.add_stream(pkts)
750 self.pg_enable_capture(self.pg_interfaces)
753 for i in self.pg_interfaces:
754 i.get_capture(0, timeout=timeout)
755 i.assert_nothing_captured(remark=remark)
758 def send_and_expect(self, input, pkts, output):
759 self.vapi.cli("clear trace")
760 input.add_stream(pkts)
761 self.pg_enable_capture(self.pg_interfaces)
763 rx = output.get_capture(len(pkts))
767 class TestCasePrinter(object):
771 self.__dict__ = self._shared_state
772 if not hasattr(self, "_test_case_set"):
773 self._test_case_set = set()
775 def print_test_case_heading_if_first_time(self, case):
776 if case.__class__ not in self._test_case_set:
777 print(double_line_delim)
778 print(colorize(getdoc(case.__class__).splitlines()[0], YELLOW))
779 print(double_line_delim)
780 self._test_case_set.add(case.__class__)
783 class VppTestResult(unittest.TestResult):
785 @property result_string
786 String variable to store the test case result string.
788 List variable containing 2-tuples of TestCase instances and strings
789 holding formatted tracebacks. Each tuple represents a test which
790 raised an unexpected exception.
792 List variable containing 2-tuples of TestCase instances and strings
793 holding formatted tracebacks. Each tuple represents a test where
794 a failure was explicitly signalled using the TestCase.assert*()
798 def __init__(self, stream, descriptions, verbosity):
800 :param stream File descriptor to store where to report test results.
801 Set to the standard error stream by default.
802 :param descriptions Boolean variable to store information if to use
803 test case descriptions.
804 :param verbosity Integer variable to store required verbosity level.
806 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
808 self.descriptions = descriptions
809 self.verbosity = verbosity
810 self.result_string = None
811 self.printer = TestCasePrinter()
813 def addSuccess(self, test):
815 Record a test succeeded result
820 if hasattr(test, 'logger'):
821 test.logger.debug("--- addSuccess() %s.%s(%s) called"
822 % (test.__class__.__name__,
823 test._testMethodName,
824 test._testMethodDoc))
825 unittest.TestResult.addSuccess(self, test)
826 self.result_string = colorize("OK", GREEN)
828 def addSkip(self, test, reason):
830 Record a test skipped.
836 if hasattr(test, 'logger'):
837 test.logger.debug("--- addSkip() %s.%s(%s) called, reason is %s"
838 % (test.__class__.__name__,
839 test._testMethodName,
842 unittest.TestResult.addSkip(self, test, reason)
843 self.result_string = colorize("SKIP", YELLOW)
845 def symlink_failed(self, test):
847 if hasattr(test, 'logger'):
849 if hasattr(test, 'tempdir'):
851 failed_dir = os.getenv('VPP_TEST_FAILED_DIR')
852 link_path = '%s/%s-FAILED' % (failed_dir,
853 test.tempdir.split("/")[-1])
855 logger.debug("creating a link to the failed test")
856 logger.debug("os.symlink(%s, %s)" %
857 (test.tempdir, link_path))
858 os.symlink(test.tempdir, link_path)
859 except Exception as e:
863 def send_failure_through_pipe(self, test):
864 if hasattr(self, 'test_framework_failed_pipe'):
865 pipe = self.test_framework_failed_pipe
867 pipe.send(test.__class__)
869 def addFailure(self, test, err):
871 Record a test failed result
874 :param err: error message
877 if hasattr(test, 'logger'):
878 test.logger.debug("--- addFailure() %s.%s(%s) called, err is %s"
879 % (test.__class__.__name__,
880 test._testMethodName,
881 test._testMethodDoc, err))
882 test.logger.debug("formatted exception is:\n%s" %
883 "".join(format_exception(*err)))
884 unittest.TestResult.addFailure(self, test, err)
885 if hasattr(test, 'tempdir'):
886 self.result_string = colorize("FAIL", RED) + \
887 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
888 self.symlink_failed(test)
890 self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
892 self.send_failure_through_pipe(test)
894 def addError(self, test, err):
896 Record a test error result
899 :param err: error message
902 if hasattr(test, 'logger'):
903 test.logger.debug("--- addError() %s.%s(%s) called, err is %s"
904 % (test.__class__.__name__,
905 test._testMethodName,
906 test._testMethodDoc, err))
907 test.logger.debug("formatted exception is:\n%s" %
908 "".join(format_exception(*err)))
909 unittest.TestResult.addError(self, test, err)
910 if hasattr(test, 'tempdir'):
911 self.result_string = colorize("ERROR", RED) + \
912 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
913 self.symlink_failed(test)
915 self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
917 self.send_failure_through_pipe(test)
919 def getDescription(self, test):
924 :returns: test description
927 # TODO: if none print warning not raise exception
928 short_description = test.shortDescription()
929 if self.descriptions and short_description:
930 return short_description
934 def startTest(self, test):
941 self.printer.print_test_case_heading_if_first_time(test)
942 unittest.TestResult.startTest(self, test)
943 if self.verbosity > 0:
945 "Starting " + self.getDescription(test) + " ...")
946 self.stream.writeln(single_line_delim)
948 def stopTest(self, test):
955 unittest.TestResult.stopTest(self, test)
956 if self.verbosity > 0:
957 self.stream.writeln(single_line_delim)
958 self.stream.writeln("%-73s%s" % (self.getDescription(test),
960 self.stream.writeln(single_line_delim)
962 self.stream.writeln("%-73s%s" % (self.getDescription(test),
965 def printErrors(self):
967 Print errors from running the test case
969 self.stream.writeln()
970 self.printErrorList('ERROR', self.errors)
971 self.printErrorList('FAIL', self.failures)
973 def printErrorList(self, flavour, errors):
975 Print error list to the output stream together with error type
976 and test case description.
978 :param flavour: error type
979 :param errors: iterable errors
982 for test, err in errors:
983 self.stream.writeln(double_line_delim)
984 self.stream.writeln("%s: %s" %
985 (flavour, self.getDescription(test)))
986 self.stream.writeln(single_line_delim)
987 self.stream.writeln("%s" % err)
990 class Filter_by_test_option:
991 def __init__(self, filter_file_name, filter_class_name, filter_func_name):
992 self.filter_file_name = filter_file_name
993 self.filter_class_name = filter_class_name
994 self.filter_func_name = filter_func_name
996 def __call__(self, file_name, class_name, func_name):
997 if self.filter_file_name and file_name != self.filter_file_name:
999 if self.filter_class_name and class_name != self.filter_class_name:
1001 if self.filter_func_name and func_name != self.filter_func_name:
1006 class VppTestRunner(unittest.TextTestRunner):
1008 A basic test runner implementation which prints results to standard error.
1011 def resultclass(self):
1012 """Class maintaining the results of the tests"""
1013 return VppTestResult
1015 def __init__(self, keep_alive_pipe=None, failed_pipe=None,
1016 stream=sys.stderr, descriptions=True,
1017 verbosity=1, failfast=False, buffer=False, resultclass=None):
1018 # ignore stream setting here, use hard-coded stdout to be in sync
1019 # with prints from VppTestCase methods ...
1020 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1021 verbosity, failfast, buffer,
1023 reporter = KeepAliveReporter()
1024 reporter.pipe = keep_alive_pipe
1025 # this is super-ugly, but very simple to implement and works as long
1026 # as we run only one test at the same time
1027 VppTestResult.test_framework_failed_pipe = failed_pipe
1029 test_option = "TEST"
1031 def parse_test_option(self):
1032 f = os.getenv(self.test_option, None)
1033 filter_file_name = None
1034 filter_class_name = None
1035 filter_func_name = None
1038 parts = f.split('.')
1040 raise Exception("Unrecognized %s option: %s" %
1041 (self.test_option, f))
1043 if parts[2] not in ('*', ''):
1044 filter_func_name = parts[2]
1045 if parts[1] not in ('*', ''):
1046 filter_class_name = parts[1]
1047 if parts[0] not in ('*', ''):
1048 if parts[0].startswith('test_'):
1049 filter_file_name = parts[0]
1051 filter_file_name = 'test_%s' % parts[0]
1053 if f.startswith('test_'):
1054 filter_file_name = f
1056 filter_file_name = 'test_%s' % f
1057 return filter_file_name, filter_class_name, filter_func_name
1060 def filter_tests(tests, filter_cb):
1061 result = unittest.suite.TestSuite()
1063 if isinstance(t, unittest.suite.TestSuite):
1064 # this is a bunch of tests, recursively filter...
1065 x = VppTestRunner.filter_tests(t, filter_cb)
1066 if x.countTestCases() > 0:
1068 elif isinstance(t, unittest.TestCase):
1069 # this is a single test
1070 parts = t.id().split('.')
1071 # t.id() for common cases like this:
1072 # test_classifier.TestClassifier.test_acl_ip
1073 # apply filtering only if it is so
1075 if not filter_cb(parts[0], parts[1], parts[2]):
1079 # unexpected object, don't touch it
1083 def run(self, test):
1090 faulthandler.enable() # emit stack trace to stderr if killed by signal
1091 print("Running tests using custom test runner") # debug message
1092 filter_file, filter_class, filter_func = self.parse_test_option()
1093 print("Active filters: file=%s, class=%s, function=%s" % (
1094 filter_file, filter_class, filter_func))
1095 filter_cb = Filter_by_test_option(
1096 filter_file, filter_class, filter_func)
1097 filtered = self.filter_tests(test, filter_cb)
1098 print("%s out of %s tests match specified filters" % (
1099 filtered.countTestCases(), test.countTestCases()))
1100 if not running_extended_tests():
1101 print("Not running extended tests (some tests will be skipped)")
1102 return super(VppTestRunner, self).run(filtered)
1105 class Worker(Thread):
1106 def __init__(self, args, logger, env={}):
1107 self.logger = logger
1110 self.env = copy.deepcopy(env)
1111 super(Worker, self).__init__()
1114 executable = self.args[0]
1115 self.logger.debug("Running executable w/args `%s'" % self.args)
1116 env = os.environ.copy()
1117 env.update(self.env)
1118 env["CK_LOG_FILE_NAME"] = "-"
1119 self.process = subprocess.Popen(
1120 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1121 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1122 out, err = self.process.communicate()
1123 self.logger.debug("Finished running `%s'" % executable)
1124 self.logger.info("Return code is `%s'" % self.process.returncode)
1125 self.logger.info(single_line_delim)
1126 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1127 self.logger.info(single_line_delim)
1128 self.logger.info(out)
1129 self.logger.info(single_line_delim)
1130 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1131 self.logger.info(single_line_delim)
1132 self.logger.info(err)
1133 self.logger.info(single_line_delim)
1134 self.result = self.process.returncode