3 from __future__ import print_function
14 from collections import deque
15 from threading import Thread, Event
16 from inspect import getdoc, isclass
17 from traceback import format_exception
18 from logging import FileHandler, DEBUG, Formatter
19 from scapy.packet import Raw
20 from hook import StepHook, PollHook, VppDiedError
21 from vpp_pg_interface import VppPGInterface
22 from vpp_sub_interface import VppSubInterface
23 from vpp_lo_interface import VppLoInterface
24 from vpp_papi_provider import VppPapiProvider
25 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
27 from vpp_object import VppObjectRegistry
28 if os.name == 'posix' and sys.version_info[0] < 3:
29 # using subprocess32 is recommended by python official documentation
30 # @ https://docs.python.org/2/library/subprocess.html
31 import subprocess32 as subprocess
35 debug_framework = False
36 if os.getenv('TEST_DEBUG', "0") == "1":
37 debug_framework = True
42 Test framework module.
44 The module provides a set of tools for constructing and running tests and
45 representing the results.
49 class _PacketInfo(object):
50 """Private class to create packet info object.
52 Help process information about the next packet.
53 Set variables to default values.
55 #: Store the index of the packet.
57 #: Store the index of the source packet generator interface of the packet.
59 #: Store the index of the destination packet generator interface
62 #: Store expected ip version
64 #: Store expected upper protocol
66 #: Store the copy of the former packet.
69 def __eq__(self, other):
70 index = self.index == other.index
71 src = self.src == other.src
72 dst = self.dst == other.dst
73 data = self.data == other.data
74 return index and src and dst and data
77 def pump_output(testclass):
78 """ pump output from vpp stdout/stderr to proper queues """
81 while not testclass.pump_thread_stop_flag.wait(0):
82 readable = select.select([testclass.vpp.stdout.fileno(),
83 testclass.vpp.stderr.fileno(),
84 testclass.pump_thread_wakeup_pipe[0]],
86 if testclass.vpp.stdout.fileno() in readable:
87 read = os.read(testclass.vpp.stdout.fileno(), 102400)
89 split = read.splitlines(True)
90 if len(stdout_fragment) > 0:
91 split[0] = "%s%s" % (stdout_fragment, split[0])
92 if len(split) > 0 and split[-1].endswith("\n"):
96 stdout_fragment = split[-1]
97 testclass.vpp_stdout_deque.extend(split[:limit])
98 if not testclass.cache_vpp_output:
99 for line in split[:limit]:
100 testclass.logger.debug(
101 "VPP STDOUT: %s" % line.rstrip("\n"))
102 if testclass.vpp.stderr.fileno() in readable:
103 read = os.read(testclass.vpp.stderr.fileno(), 102400)
105 split = read.splitlines(True)
106 if len(stderr_fragment) > 0:
107 split[0] = "%s%s" % (stderr_fragment, split[0])
108 if len(split) > 0 and split[-1].endswith("\n"):
112 stderr_fragment = split[-1]
113 testclass.vpp_stderr_deque.extend(split[:limit])
114 if not testclass.cache_vpp_output:
115 for line in split[:limit]:
116 testclass.logger.debug(
117 "VPP STDERR: %s" % line.rstrip("\n"))
118 # ignoring the dummy pipe here intentionally - the flag will take care
119 # of properly terminating the loop
122 def running_extended_tests():
123 s = os.getenv("EXTENDED_TESTS", "n")
124 return True if s.lower() in ("y", "yes", "1") else False
127 def running_on_centos():
128 os_id = os.getenv("OS_ID", "")
129 return True if "centos" in os_id.lower() else False
132 class KeepAliveReporter(object):
134 Singleton object which reports test start to parent process
139 self.__dict__ = self._shared_state
146 def pipe(self, pipe):
147 if hasattr(self, '_pipe'):
148 raise Exception("Internal error - pipe should only be set once.")
151 def send_keep_alive(self, test):
153 Write current test tmpdir & desc to keep-alive pipe to signal liveness
155 if self.pipe is None:
156 # if not running forked..
162 desc = test.shortDescription()
166 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
169 class VppTestCase(unittest.TestCase):
170 """This subclass is a base class for VPP test cases that are implemented as
171 classes. It provides methods to create and run test case.
175 def packet_infos(self):
176 """List of packet infos"""
177 return self._packet_infos
180 def get_packet_count_for_if_idx(cls, dst_if_index):
181 """Get the number of packet info for specified destination if index"""
182 if dst_if_index in cls._packet_count_for_dst_if_idx:
183 return cls._packet_count_for_dst_if_idx[dst_if_index]
189 """Return the instance of this testcase"""
190 return cls.test_instance
193 def set_debug_flags(cls, d):
194 cls.debug_core = False
195 cls.debug_gdb = False
196 cls.debug_gdbserver = False
201 cls.debug_core = True
204 elif dl == "gdbserver":
205 cls.debug_gdbserver = True
207 raise Exception("Unrecognized DEBUG option: '%s'" % d)
210 def setUpConstants(cls):
211 """ Set-up the test case class based on environment variables """
212 s = os.getenv("STEP", "n")
213 cls.step = True if s.lower() in ("y", "yes", "1") else False
214 d = os.getenv("DEBUG", None)
215 c = os.getenv("CACHE_OUTPUT", "1")
216 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
217 cls.set_debug_flags(d)
218 cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
219 cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
220 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
222 if cls.plugin_path is not None:
223 if cls.extern_plugin_path is not None:
224 plugin_path = "%s:%s" % (
225 cls.plugin_path, cls.extern_plugin_path)
227 plugin_path = cls.plugin_path
228 elif cls.extern_plugin_path is not None:
229 plugin_path = cls.extern_plugin_path
231 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
232 debug_cli = "cli-listen localhost:5002"
234 size = os.getenv("COREDUMP_SIZE")
236 coredump_size = "coredump-size %s" % size
237 if coredump_size is None:
238 coredump_size = "coredump-size unlimited"
239 cls.vpp_cmdline = [cls.vpp_bin, "unix",
240 "{", "nodaemon", debug_cli, "full-coredump",
241 coredump_size, "}", "api-trace", "{", "on", "}",
242 "api-segment", "{", "prefix", cls.shm_prefix, "}",
243 "plugins", "{", "plugin", "dpdk_plugin.so", "{",
244 "disable", "}", "}", ]
245 if plugin_path is not None:
246 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
247 cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
250 def wait_for_enter(cls):
251 if cls.debug_gdbserver:
252 print(double_line_delim)
253 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
255 print(double_line_delim)
256 print("Spawned VPP with PID: %d" % cls.vpp.pid)
258 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
260 print(single_line_delim)
261 print("You can debug the VPP using e.g.:")
262 if cls.debug_gdbserver:
263 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
264 print("Now is the time to attach a gdb by running the above "
265 "command, set up breakpoints etc. and then resume VPP from "
266 "within gdb by issuing the 'continue' command")
268 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
269 print("Now is the time to attach a gdb by running the above "
270 "command and set up breakpoints etc.")
271 print(single_line_delim)
272 sys.stdin.readline("Press ENTER to continue running the testcase...")
276 cmdline = cls.vpp_cmdline
278 if cls.debug_gdbserver:
279 gdbserver = '/usr/bin/gdbserver'
280 if not os.path.isfile(gdbserver) or \
281 not os.access(gdbserver, os.X_OK):
282 raise Exception("gdbserver binary '%s' does not exist or is "
283 "not executable" % gdbserver)
285 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
286 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
289 cls.vpp = subprocess.Popen(cmdline,
290 stdout=subprocess.PIPE,
291 stderr=subprocess.PIPE,
293 except Exception as e:
294 cls.logger.critical("Couldn't start vpp: %s" % e)
302 Perform class setup before running the testcase
303 Remove shared memory files, start vpp and connect the vpp-api
305 gc.collect() # run garbage collection first
307 cls.logger = getLogger(cls.__name__)
308 cls.tempdir = tempfile.mkdtemp(
309 prefix='vpp-unittest-%s-' % cls.__name__)
310 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
311 cls.file_handler.setFormatter(
312 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
314 cls.file_handler.setLevel(DEBUG)
315 cls.logger.addHandler(cls.file_handler)
316 cls.shm_prefix = cls.tempdir.split("/")[-1]
317 os.chdir(cls.tempdir)
318 cls.logger.info("Temporary dir is %s, shm prefix is %s",
319 cls.tempdir, cls.shm_prefix)
321 cls.reset_packet_infos()
323 cls._zombie_captures = []
326 cls.registry = VppObjectRegistry()
327 cls.vpp_startup_failed = False
328 cls.reporter = KeepAliveReporter()
329 # need to catch exceptions here because if we raise, then the cleanup
330 # doesn't get called and we might end with a zombie vpp
333 cls.reporter.send_keep_alive(cls)
334 cls.vpp_stdout_deque = deque()
335 cls.vpp_stderr_deque = deque()
336 cls.pump_thread_stop_flag = Event()
337 cls.pump_thread_wakeup_pipe = os.pipe()
338 cls.pump_thread = Thread(target=pump_output, args=(cls,))
339 cls.pump_thread.daemon = True
340 cls.pump_thread.start()
341 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls)
346 cls.vapi.register_hook(hook)
347 cls.sleep(0.1, "after vpp startup, before initial poll")
351 cls.vpp_startup_failed = True
353 "VPP died shortly after startup, check the"
354 " output to standard error for possible cause")
360 cls.vapi.disconnect()
363 if cls.debug_gdbserver:
364 print(colorize("You're running VPP inside gdbserver but "
365 "VPP-API connection failed, did you forget "
366 "to 'continue' VPP from within gdb?", RED))
378 Disconnect vpp-api, kill vpp and cleanup shared memory files
380 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
382 if cls.vpp.returncode is None:
383 print(double_line_delim)
384 print("VPP or GDB server is still running")
385 print(single_line_delim)
387 "When done debugging, press ENTER to kill the process and "
388 "finish running the testcase...")
390 os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
391 cls.pump_thread_stop_flag.set()
392 if hasattr(cls, 'pump_thread'):
393 cls.logger.debug("Waiting for pump thread to stop")
394 cls.pump_thread.join()
395 if hasattr(cls, 'vpp_stderr_reader_thread'):
396 cls.logger.debug("Waiting for stdderr pump to stop")
397 cls.vpp_stderr_reader_thread.join()
399 if hasattr(cls, 'vpp'):
400 if hasattr(cls, 'vapi'):
401 cls.vapi.disconnect()
404 if cls.vpp.returncode is None:
405 cls.logger.debug("Sending TERM to vpp")
407 cls.logger.debug("Waiting for vpp to die")
408 cls.vpp.communicate()
411 if cls.vpp_startup_failed:
412 stdout_log = cls.logger.info
413 stderr_log = cls.logger.critical
415 stdout_log = cls.logger.info
416 stderr_log = cls.logger.info
418 if hasattr(cls, 'vpp_stdout_deque'):
419 stdout_log(single_line_delim)
420 stdout_log('VPP output to stdout while running %s:', cls.__name__)
421 stdout_log(single_line_delim)
422 vpp_output = "".join(cls.vpp_stdout_deque)
423 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
425 stdout_log('\n%s', vpp_output)
426 stdout_log(single_line_delim)
428 if hasattr(cls, 'vpp_stderr_deque'):
429 stderr_log(single_line_delim)
430 stderr_log('VPP output to stderr while running %s:', cls.__name__)
431 stderr_log(single_line_delim)
432 vpp_output = "".join(cls.vpp_stderr_deque)
433 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
435 stderr_log('\n%s', vpp_output)
436 stderr_log(single_line_delim)
439 def tearDownClass(cls):
440 """ Perform final cleanup after running all tests in this test-case """
442 cls.file_handler.close()
443 cls.reset_packet_infos()
445 debug_internal.on_tear_down_class(cls)
448 """ Show various debug prints after each test """
449 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
450 (self.__class__.__name__, self._testMethodName,
451 self._testMethodDoc))
452 if not self.vpp_dead:
453 self.logger.debug(self.vapi.cli("show trace"))
454 self.logger.info(self.vapi.ppcli("show interface"))
455 self.logger.info(self.vapi.ppcli("show hardware"))
456 self.logger.info(self.vapi.ppcli("show error"))
457 self.logger.info(self.vapi.ppcli("show run"))
458 self.registry.remove_vpp_config(self.logger)
459 # Save/Dump VPP api trace log
460 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
461 tmp_api_trace = "/tmp/%s" % api_trace
462 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
463 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
464 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
466 os.rename(tmp_api_trace, vpp_api_trace_log)
467 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
470 self.registry.unregister_all(self.logger)
473 """ Clear trace before running each test"""
474 self.reporter.send_keep_alive(self)
475 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
476 (self.__class__.__name__, self._testMethodName,
477 self._testMethodDoc))
479 raise Exception("VPP is dead when setting up the test")
480 self.sleep(.1, "during setUp")
481 self.vpp_stdout_deque.append(
482 "--- test setUp() for %s.%s(%s) starts here ---\n" %
483 (self.__class__.__name__, self._testMethodName,
484 self._testMethodDoc))
485 self.vpp_stderr_deque.append(
486 "--- test setUp() for %s.%s(%s) starts here ---\n" %
487 (self.__class__.__name__, self._testMethodName,
488 self._testMethodDoc))
489 self.vapi.cli("clear trace")
490 # store the test instance inside the test class - so that objects
491 # holding the class can access instance methods (like assertEqual)
492 type(self).test_instance = self
495 def pg_enable_capture(cls, interfaces=None):
497 Enable capture on packet-generator interfaces
499 :param interfaces: iterable interface indexes (if None,
500 use self.pg_interfaces)
503 if interfaces is None:
504 interfaces = cls.pg_interfaces
509 def register_capture(cls, cap_name):
510 """ Register a capture in the testclass """
511 # add to the list of captures with current timestamp
512 cls._captures.append((time.time(), cap_name))
513 # filter out from zombies
514 cls._zombie_captures = [(stamp, name)
515 for (stamp, name) in cls._zombie_captures
520 """ Remove any zombie captures and enable the packet generator """
521 # how long before capture is allowed to be deleted - otherwise vpp
522 # crashes - 100ms seems enough (this shouldn't be needed at all)
525 for stamp, cap_name in cls._zombie_captures:
526 wait = stamp + capture_ttl - now
528 cls.sleep(wait, "before deleting capture %s" % cap_name)
530 cls.logger.debug("Removing zombie capture %s" % cap_name)
531 cls.vapi.cli('packet-generator delete %s' % cap_name)
533 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
534 cls.vapi.cli('packet-generator enable')
535 cls._zombie_captures = cls._captures
539 def create_pg_interfaces(cls, interfaces):
541 Create packet-generator interfaces.
543 :param interfaces: iterable indexes of the interfaces.
544 :returns: List of created interfaces.
549 intf = VppPGInterface(cls, i)
550 setattr(cls, intf.name, intf)
552 cls.pg_interfaces = result
556 def create_loopback_interfaces(cls, interfaces):
558 Create loopback interfaces.
560 :param interfaces: iterable indexes of the interfaces.
561 :returns: List of created interfaces.
565 intf = VppLoInterface(cls, i)
566 setattr(cls, intf.name, intf)
568 cls.lo_interfaces = result
572 def extend_packet(packet, size, padding=' '):
574 Extend packet to given size by padding with spaces or custom padding
575 NOTE: Currently works only when Raw layer is present.
577 :param packet: packet
578 :param size: target size
579 :param padding: padding used to extend the payload
582 packet_len = len(packet) + 4
583 extend = size - packet_len
585 num = (extend / len(padding)) + 1
586 packet[Raw].load += (padding * num)[:extend]
589 def reset_packet_infos(cls):
590 """ Reset the list of packet info objects and packet counts to zero """
591 cls._packet_infos = {}
592 cls._packet_count_for_dst_if_idx = {}
595 def create_packet_info(cls, src_if, dst_if):
597 Create packet info object containing the source and destination indexes
598 and add it to the testcase's packet info list
600 :param VppInterface src_if: source interface
601 :param VppInterface dst_if: destination interface
603 :returns: _PacketInfo object
607 info.index = len(cls._packet_infos)
608 info.src = src_if.sw_if_index
609 info.dst = dst_if.sw_if_index
610 if isinstance(dst_if, VppSubInterface):
611 dst_idx = dst_if.parent.sw_if_index
613 dst_idx = dst_if.sw_if_index
614 if dst_idx in cls._packet_count_for_dst_if_idx:
615 cls._packet_count_for_dst_if_idx[dst_idx] += 1
617 cls._packet_count_for_dst_if_idx[dst_idx] = 1
618 cls._packet_infos[info.index] = info
622 def info_to_payload(info):
624 Convert _PacketInfo object to packet payload
626 :param info: _PacketInfo object
628 :returns: string containing serialized data from packet info
630 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
634 def payload_to_info(payload):
636 Convert packet payload to _PacketInfo object
638 :param payload: packet payload
640 :returns: _PacketInfo object containing de-serialized data from payload
643 numbers = payload.split()
645 info.index = int(numbers[0])
646 info.src = int(numbers[1])
647 info.dst = int(numbers[2])
648 info.ip = int(numbers[3])
649 info.proto = int(numbers[4])
652 def get_next_packet_info(self, info):
654 Iterate over the packet info list stored in the testcase
655 Start iteration with first element if info is None
656 Continue based on index in info if info is specified
658 :param info: info or None
659 :returns: next info in list or None if no more infos
664 next_index = info.index + 1
665 if next_index == len(self._packet_infos):
668 return self._packet_infos[next_index]
670 def get_next_packet_info_for_interface(self, src_index, info):
672 Search the packet info list for the next packet info with same source
675 :param src_index: source interface index to search for
676 :param info: packet info - where to start the search
677 :returns: packet info or None
681 info = self.get_next_packet_info(info)
684 if info.src == src_index:
687 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
689 Search the packet info list for the next packet info with same source
690 and destination interface indexes
692 :param src_index: source interface index to search for
693 :param dst_index: destination interface index to search for
694 :param info: packet info - where to start the search
695 :returns: packet info or None
699 info = self.get_next_packet_info_for_interface(src_index, info)
702 if info.dst == dst_index:
705 def assert_equal(self, real_value, expected_value, name_or_class=None):
706 if name_or_class is None:
707 self.assertEqual(real_value, expected_value)
710 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
711 msg = msg % (getdoc(name_or_class).strip(),
712 real_value, str(name_or_class(real_value)),
713 expected_value, str(name_or_class(expected_value)))
715 msg = "Invalid %s: %s does not match expected value %s" % (
716 name_or_class, real_value, expected_value)
718 self.assertEqual(real_value, expected_value, msg)
720 def assert_in_range(self,
728 msg = "Invalid %s: %s out of range <%s,%s>" % (
729 name, real_value, expected_min, expected_max)
730 self.assertTrue(expected_min <= real_value <= expected_max, msg)
733 def sleep(cls, timeout, remark=None):
734 if hasattr(cls, 'logger'):
735 cls.logger.debug("Starting sleep for %ss (%s)" % (timeout, remark))
739 if after - before > 2 * timeout:
740 cls.logger.error("unexpected time.sleep() result - "
741 "slept for %ss instead of ~%ss!" % (
742 after - before, timeout))
743 if hasattr(cls, 'logger'):
745 "Finished sleep (%s) - slept %ss (wanted %ss)" % (
746 remark, after - before, timeout))
748 def send_and_assert_no_replies(self, intf, pkts, remark=""):
749 self.vapi.cli("clear trace")
750 intf.add_stream(pkts)
751 self.pg_enable_capture(self.pg_interfaces)
754 for i in self.pg_interfaces:
755 i.get_capture(0, timeout=timeout)
756 i.assert_nothing_captured(remark=remark)
759 def send_and_expect(self, input, pkts, output):
760 self.vapi.cli("clear trace")
761 input.add_stream(pkts)
762 self.pg_enable_capture(self.pg_interfaces)
764 rx = output.get_capture(len(pkts))
768 class TestCasePrinter(object):
772 self.__dict__ = self._shared_state
773 if not hasattr(self, "_test_case_set"):
774 self._test_case_set = set()
776 def print_test_case_heading_if_first_time(self, case):
777 if case.__class__ not in self._test_case_set:
778 print(double_line_delim)
779 print(colorize(getdoc(case.__class__).splitlines()[0], YELLOW))
780 print(double_line_delim)
781 self._test_case_set.add(case.__class__)
784 class VppTestResult(unittest.TestResult):
786 @property result_string
787 String variable to store the test case result string.
789 List variable containing 2-tuples of TestCase instances and strings
790 holding formatted tracebacks. Each tuple represents a test which
791 raised an unexpected exception.
793 List variable containing 2-tuples of TestCase instances and strings
794 holding formatted tracebacks. Each tuple represents a test where
795 a failure was explicitly signalled using the TestCase.assert*()
799 def __init__(self, stream, descriptions, verbosity):
801 :param stream File descriptor to store where to report test results.
802 Set to the standard error stream by default.
803 :param descriptions Boolean variable to store information if to use
804 test case descriptions.
805 :param verbosity Integer variable to store required verbosity level.
807 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
809 self.descriptions = descriptions
810 self.verbosity = verbosity
811 self.result_string = None
812 self.printer = TestCasePrinter()
814 def addSuccess(self, test):
816 Record a test succeeded result
821 if hasattr(test, 'logger'):
822 test.logger.debug("--- addSuccess() %s.%s(%s) called"
823 % (test.__class__.__name__,
824 test._testMethodName,
825 test._testMethodDoc))
826 unittest.TestResult.addSuccess(self, test)
827 self.result_string = colorize("OK", GREEN)
829 def addSkip(self, test, reason):
831 Record a test skipped.
837 if hasattr(test, 'logger'):
838 test.logger.debug("--- addSkip() %s.%s(%s) called, reason is %s"
839 % (test.__class__.__name__,
840 test._testMethodName,
843 unittest.TestResult.addSkip(self, test, reason)
844 self.result_string = colorize("SKIP", YELLOW)
846 def symlink_failed(self, test):
848 if hasattr(test, 'logger'):
850 if hasattr(test, 'tempdir'):
852 failed_dir = os.getenv('VPP_TEST_FAILED_DIR')
853 link_path = '%s/%s-FAILED' % (failed_dir,
854 test.tempdir.split("/")[-1])
856 logger.debug("creating a link to the failed test")
857 logger.debug("os.symlink(%s, %s)" %
858 (test.tempdir, link_path))
859 os.symlink(test.tempdir, link_path)
860 except Exception as e:
864 def send_failure_through_pipe(self, test):
865 if hasattr(self, 'test_framework_failed_pipe'):
866 pipe = self.test_framework_failed_pipe
868 pipe.send(test.__class__)
870 def addFailure(self, test, err):
872 Record a test failed result
875 :param err: error message
878 if hasattr(test, 'logger'):
879 test.logger.debug("--- addFailure() %s.%s(%s) called, err is %s"
880 % (test.__class__.__name__,
881 test._testMethodName,
882 test._testMethodDoc, err))
883 test.logger.debug("formatted exception is:\n%s" %
884 "".join(format_exception(*err)))
885 unittest.TestResult.addFailure(self, test, err)
886 if hasattr(test, 'tempdir'):
887 self.result_string = colorize("FAIL", RED) + \
888 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
889 self.symlink_failed(test)
891 self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
893 self.send_failure_through_pipe(test)
895 def addError(self, test, err):
897 Record a test error result
900 :param err: error message
903 if hasattr(test, 'logger'):
904 test.logger.debug("--- addError() %s.%s(%s) called, err is %s"
905 % (test.__class__.__name__,
906 test._testMethodName,
907 test._testMethodDoc, err))
908 test.logger.debug("formatted exception is:\n%s" %
909 "".join(format_exception(*err)))
910 unittest.TestResult.addError(self, test, err)
911 if hasattr(test, 'tempdir'):
912 self.result_string = colorize("ERROR", RED) + \
913 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
914 self.symlink_failed(test)
916 self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
918 self.send_failure_through_pipe(test)
920 def getDescription(self, test):
925 :returns: test description
928 # TODO: if none print warning not raise exception
929 short_description = test.shortDescription()
930 if self.descriptions and short_description:
931 return short_description
935 def startTest(self, test):
942 self.printer.print_test_case_heading_if_first_time(test)
943 unittest.TestResult.startTest(self, test)
944 if self.verbosity > 0:
946 "Starting " + self.getDescription(test) + " ...")
947 self.stream.writeln(single_line_delim)
949 def stopTest(self, test):
956 unittest.TestResult.stopTest(self, test)
957 if self.verbosity > 0:
958 self.stream.writeln(single_line_delim)
959 self.stream.writeln("%-73s%s" % (self.getDescription(test),
961 self.stream.writeln(single_line_delim)
963 self.stream.writeln("%-73s%s" % (self.getDescription(test),
966 def printErrors(self):
968 Print errors from running the test case
970 self.stream.writeln()
971 self.printErrorList('ERROR', self.errors)
972 self.printErrorList('FAIL', self.failures)
974 def printErrorList(self, flavour, errors):
976 Print error list to the output stream together with error type
977 and test case description.
979 :param flavour: error type
980 :param errors: iterable errors
983 for test, err in errors:
984 self.stream.writeln(double_line_delim)
985 self.stream.writeln("%s: %s" %
986 (flavour, self.getDescription(test)))
987 self.stream.writeln(single_line_delim)
988 self.stream.writeln("%s" % err)
991 class Filter_by_test_option:
992 def __init__(self, filter_file_name, filter_class_name, filter_func_name):
993 self.filter_file_name = filter_file_name
994 self.filter_class_name = filter_class_name
995 self.filter_func_name = filter_func_name
997 def __call__(self, file_name, class_name, func_name):
998 if self.filter_file_name and file_name != self.filter_file_name:
1000 if self.filter_class_name and class_name != self.filter_class_name:
1002 if self.filter_func_name and func_name != self.filter_func_name:
1007 class VppTestRunner(unittest.TextTestRunner):
1009 A basic test runner implementation which prints results to standard error.
1012 def resultclass(self):
1013 """Class maintaining the results of the tests"""
1014 return VppTestResult
1016 def __init__(self, keep_alive_pipe=None, failed_pipe=None,
1017 stream=sys.stderr, descriptions=True,
1018 verbosity=1, failfast=False, buffer=False, resultclass=None):
1019 # ignore stream setting here, use hard-coded stdout to be in sync
1020 # with prints from VppTestCase methods ...
1021 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1022 verbosity, failfast, buffer,
1024 reporter = KeepAliveReporter()
1025 reporter.pipe = keep_alive_pipe
1026 # this is super-ugly, but very simple to implement and works as long
1027 # as we run only one test at the same time
1028 VppTestResult.test_framework_failed_pipe = failed_pipe
1030 test_option = "TEST"
1032 def parse_test_option(self):
1033 f = os.getenv(self.test_option, None)
1034 filter_file_name = None
1035 filter_class_name = None
1036 filter_func_name = None
1039 parts = f.split('.')
1041 raise Exception("Unrecognized %s option: %s" %
1042 (self.test_option, f))
1044 if parts[2] not in ('*', ''):
1045 filter_func_name = parts[2]
1046 if parts[1] not in ('*', ''):
1047 filter_class_name = parts[1]
1048 if parts[0] not in ('*', ''):
1049 if parts[0].startswith('test_'):
1050 filter_file_name = parts[0]
1052 filter_file_name = 'test_%s' % parts[0]
1054 if f.startswith('test_'):
1055 filter_file_name = f
1057 filter_file_name = 'test_%s' % f
1058 return filter_file_name, filter_class_name, filter_func_name
1061 def filter_tests(tests, filter_cb):
1062 result = unittest.suite.TestSuite()
1064 if isinstance(t, unittest.suite.TestSuite):
1065 # this is a bunch of tests, recursively filter...
1066 x = VppTestRunner.filter_tests(t, filter_cb)
1067 if x.countTestCases() > 0:
1069 elif isinstance(t, unittest.TestCase):
1070 # this is a single test
1071 parts = t.id().split('.')
1072 # t.id() for common cases like this:
1073 # test_classifier.TestClassifier.test_acl_ip
1074 # apply filtering only if it is so
1076 if not filter_cb(parts[0], parts[1], parts[2]):
1080 # unexpected object, don't touch it
1084 def run(self, test):
1091 faulthandler.enable() # emit stack trace to stderr if killed by signal
1092 print("Running tests using custom test runner") # debug message
1093 filter_file, filter_class, filter_func = self.parse_test_option()
1094 print("Active filters: file=%s, class=%s, function=%s" % (
1095 filter_file, filter_class, filter_func))
1096 filter_cb = Filter_by_test_option(
1097 filter_file, filter_class, filter_func)
1098 filtered = self.filter_tests(test, filter_cb)
1099 print("%s out of %s tests match specified filters" % (
1100 filtered.countTestCases(), test.countTestCases()))
1101 if not running_extended_tests():
1102 print("Not running extended tests (some tests will be skipped)")
1103 return super(VppTestRunner, self).run(filtered)
1106 class Worker(Thread):
1107 def __init__(self, args, logger, env={}):
1108 self.logger = logger
1111 self.env = copy.deepcopy(env)
1112 super(Worker, self).__init__()
1115 executable = self.args[0]
1116 self.logger.debug("Running executable w/args `%s'" % self.args)
1117 env = os.environ.copy()
1118 env.update(self.env)
1119 env["CK_LOG_FILE_NAME"] = "-"
1120 self.process = subprocess.Popen(
1121 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1122 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1123 out, err = self.process.communicate()
1124 self.logger.debug("Finished running `%s'" % executable)
1125 self.logger.info("Return code is `%s'" % self.process.returncode)
1126 self.logger.info(single_line_delim)
1127 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1128 self.logger.info(single_line_delim)
1129 self.logger.info(out)
1130 self.logger.info(single_line_delim)
1131 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1132 self.logger.info(single_line_delim)
1133 self.logger.info(err)
1134 self.logger.info(single_line_delim)
1135 self.result = self.process.returncode