3 from __future__ import print_function
15 from collections import deque
16 from threading import Thread, Event
17 from inspect import getdoc, isclass
18 from traceback import format_exception
19 from logging import FileHandler, DEBUG, Formatter
20 from scapy.packet import Raw
21 from hook import StepHook, PollHook
22 from vpp_pg_interface import VppPGInterface
23 from vpp_sub_interface import VppSubInterface
24 from vpp_lo_interface import VppLoInterface
25 from vpp_papi_provider import VppPapiProvider
27 from vpp_object import VppObjectRegistry
28 from vpp_punt_socket import vpp_uds_socket_name
29 if os.name == 'posix' and sys.version_info[0] < 3:
30 # using subprocess32 is recommended by python official documentation
31 # @ https://docs.python.org/2/library/subprocess.html
32 import subprocess32 as subprocess
36 debug_framework = False
37 if os.getenv('TEST_DEBUG', "0") == "1":
38 debug_framework = True
43 Test framework module.
45 The module provides a set of tools for constructing and running tests and
46 representing the results.
50 class _PacketInfo(object):
51 """Private class to create packet info object.
53 Help process information about the next packet.
54 Set variables to default values.
56 #: Store the index of the packet.
58 #: Store the index of the source packet generator interface of the packet.
60 #: Store the index of the destination packet generator interface
63 #: Store expected ip version
65 #: Store expected upper protocol
67 #: Store the copy of the former packet.
70 def __eq__(self, other):
71 index = self.index == other.index
72 src = self.src == other.src
73 dst = self.dst == other.dst
74 data = self.data == other.data
75 return index and src and dst and data
78 def pump_output(testclass):
79 """ pump output from vpp stdout/stderr to proper queues """
82 while not testclass.pump_thread_stop_flag.wait(0):
83 readable = select.select([testclass.vpp.stdout.fileno(),
84 testclass.vpp.stderr.fileno(),
85 testclass.pump_thread_wakeup_pipe[0]],
87 if testclass.vpp.stdout.fileno() in readable:
88 read = os.read(testclass.vpp.stdout.fileno(), 102400)
90 split = read.splitlines(True)
91 if len(stdout_fragment) > 0:
92 split[0] = "%s%s" % (stdout_fragment, split[0])
93 if len(split) > 0 and split[-1].endswith("\n"):
97 stdout_fragment = split[-1]
98 testclass.vpp_stdout_deque.extend(split[:limit])
99 if not testclass.cache_vpp_output:
100 for line in split[:limit]:
101 testclass.logger.debug(
102 "VPP STDOUT: %s" % line.rstrip("\n"))
103 if testclass.vpp.stderr.fileno() in readable:
104 read = os.read(testclass.vpp.stderr.fileno(), 102400)
106 split = read.splitlines(True)
107 if len(stderr_fragment) > 0:
108 split[0] = "%s%s" % (stderr_fragment, split[0])
109 if len(split) > 0 and split[-1].endswith("\n"):
113 stderr_fragment = split[-1]
114 testclass.vpp_stderr_deque.extend(split[:limit])
115 if not testclass.cache_vpp_output:
116 for line in split[:limit]:
117 testclass.logger.debug(
118 "VPP STDERR: %s" % line.rstrip("\n"))
119 # ignoring the dummy pipe here intentionally - the flag will take care
120 # of properly terminating the loop
123 def running_extended_tests():
125 s = os.getenv("EXTENDED_TESTS")
126 return True if s.lower() in ("y", "yes", "1") else False
132 def running_on_centos():
134 os_id = os.getenv("OS_ID")
135 return True if "centos" in os_id.lower() else False
141 class KeepAliveReporter(object):
143 Singleton object which reports test start to parent process
148 self.__dict__ = self._shared_state
155 def pipe(self, pipe):
156 if hasattr(self, '_pipe'):
157 raise Exception("Internal error - pipe should only be set once.")
160 def send_keep_alive(self, test):
162 Write current test tmpdir & desc to keep-alive pipe to signal liveness
164 if self.pipe is None:
165 # if not running forked..
171 desc = test.shortDescription()
175 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
178 class VppTestCase(unittest.TestCase):
179 """This subclass is a base class for VPP test cases that are implemented as
180 classes. It provides methods to create and run test case.
184 def packet_infos(self):
185 """List of packet infos"""
186 return self._packet_infos
189 def get_packet_count_for_if_idx(cls, dst_if_index):
190 """Get the number of packet info for specified destination if index"""
191 if dst_if_index in cls._packet_count_for_dst_if_idx:
192 return cls._packet_count_for_dst_if_idx[dst_if_index]
198 """Return the instance of this testcase"""
199 return cls.test_instance
202 def set_debug_flags(cls, d):
203 cls.debug_core = False
204 cls.debug_gdb = False
205 cls.debug_gdbserver = False
210 cls.debug_core = True
213 elif dl == "gdbserver":
214 cls.debug_gdbserver = True
216 raise Exception("Unrecognized DEBUG option: '%s'" % d)
219 def setUpConstants(cls):
220 """ Set-up the test case class based on environment variables """
222 s = os.getenv("STEP")
223 cls.step = True if s.lower() in ("y", "yes", "1") else False
227 d = os.getenv("DEBUG")
231 c = os.getenv("CACHE_OUTPUT", "1")
232 cls.cache_vpp_output = \
233 False if c.lower() in ("n", "no", "0") else True
235 cls.cache_vpp_output = True
236 cls.set_debug_flags(d)
237 cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
238 cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
239 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
241 if cls.plugin_path is not None:
242 if cls.extern_plugin_path is not None:
243 plugin_path = "%s:%s" % (
244 cls.plugin_path, cls.extern_plugin_path)
246 plugin_path = cls.plugin_path
247 elif cls.extern_plugin_path is not None:
248 plugin_path = cls.extern_plugin_path
250 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
251 debug_cli = "cli-listen localhost:5002"
254 size = os.getenv("COREDUMP_SIZE")
256 coredump_size = "coredump-size %s" % size
259 if coredump_size is None:
260 coredump_size = "coredump-size unlimited"
261 cls.vpp_cmdline = [cls.vpp_bin, "unix",
262 "{", "nodaemon", debug_cli, "full-coredump",
263 coredump_size, "}", "api-trace", "{", "on", "}",
264 "api-segment", "{", "prefix", cls.shm_prefix, "}",
265 "plugins", "{", "plugin", "dpdk_plugin.so", "{",
267 "punt", "{", "socket", cls.punt_socket_path, "}"]
268 if plugin_path is not None:
269 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
270 cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
273 def wait_for_enter(cls):
274 if cls.debug_gdbserver:
275 print(double_line_delim)
276 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
278 print(double_line_delim)
279 print("Spawned VPP with PID: %d" % cls.vpp.pid)
281 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
283 print(single_line_delim)
284 print("You can debug the VPP using e.g.:")
285 if cls.debug_gdbserver:
286 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
287 print("Now is the time to attach a gdb by running the above "
288 "command, set up breakpoints etc. and then resume VPP from "
289 "within gdb by issuing the 'continue' command")
291 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
292 print("Now is the time to attach a gdb by running the above "
293 "command and set up breakpoints etc.")
294 print(single_line_delim)
295 raw_input("Press ENTER to continue running the testcase...")
299 cmdline = cls.vpp_cmdline
301 if cls.debug_gdbserver:
302 gdbserver = '/usr/bin/gdbserver'
303 if not os.path.isfile(gdbserver) or \
304 not os.access(gdbserver, os.X_OK):
305 raise Exception("gdbserver binary '%s' does not exist or is "
306 "not executable" % gdbserver)
308 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
309 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
312 cls.vpp = subprocess.Popen(cmdline,
313 stdout=subprocess.PIPE,
314 stderr=subprocess.PIPE,
316 except Exception as e:
317 cls.logger.critical("Couldn't start vpp: %s" % e)
325 Perform class setup before running the testcase
326 Remove shared memory files, start vpp and connect the vpp-api
328 gc.collect() # run garbage collection first
330 cls.logger = getLogger(cls.__name__)
331 cls.tempdir = tempfile.mkdtemp(
332 prefix='vpp-unittest-%s-' % cls.__name__)
333 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
334 cls.file_handler.setFormatter(
335 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
337 cls.file_handler.setLevel(DEBUG)
338 cls.logger.addHandler(cls.file_handler)
339 cls.shm_prefix = cls.tempdir.split("/")[-1]
340 cls.punt_socket_path = '%s/%s' % (cls.tempdir, vpp_uds_socket_name)
341 os.chdir(cls.tempdir)
342 cls.logger.info("Temporary dir is %s, shm prefix is %s",
343 cls.tempdir, cls.shm_prefix)
345 cls.reset_packet_infos()
347 cls._zombie_captures = []
350 cls.registry = VppObjectRegistry()
351 cls.vpp_startup_failed = False
352 cls.reporter = KeepAliveReporter()
353 # need to catch exceptions here because if we raise, then the cleanup
354 # doesn't get called and we might end with a zombie vpp
357 cls.reporter.send_keep_alive(cls)
358 cls.vpp_stdout_deque = deque()
359 cls.vpp_stderr_deque = deque()
360 cls.pump_thread_stop_flag = Event()
361 cls.pump_thread_wakeup_pipe = os.pipe()
362 cls.pump_thread = Thread(target=pump_output, args=(cls,))
363 cls.pump_thread.daemon = True
364 cls.pump_thread.start()
365 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls)
370 cls.vapi.register_hook(hook)
371 cls.sleep(0.1, "after vpp startup, before initial poll")
375 cls.vpp_startup_failed = True
377 "VPP died shortly after startup, check the"
378 " output to standard error for possible cause")
383 if cls.debug_gdbserver:
384 print(colorize("You're running VPP inside gdbserver but "
385 "VPP-API connection failed, did you forget "
386 "to 'continue' VPP from within gdb?", RED))
389 t, v, tb = sys.exc_info()
399 Disconnect vpp-api, kill vpp and cleanup shared memory files
401 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
403 if cls.vpp.returncode is None:
404 print(double_line_delim)
405 print("VPP or GDB server is still running")
406 print(single_line_delim)
407 raw_input("When done debugging, press ENTER to kill the "
408 "process and finish running the testcase...")
410 os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
411 cls.pump_thread_stop_flag.set()
412 if hasattr(cls, 'pump_thread'):
413 cls.logger.debug("Waiting for pump thread to stop")
414 cls.pump_thread.join()
415 if hasattr(cls, 'vpp_stderr_reader_thread'):
416 cls.logger.debug("Waiting for stdderr pump to stop")
417 cls.vpp_stderr_reader_thread.join()
419 if hasattr(cls, 'vpp'):
420 if hasattr(cls, 'vapi'):
421 cls.vapi.disconnect()
424 if cls.vpp.returncode is None:
425 cls.logger.debug("Sending TERM to vpp")
427 cls.logger.debug("Waiting for vpp to die")
428 cls.vpp.communicate()
431 if cls.vpp_startup_failed:
432 stdout_log = cls.logger.info
433 stderr_log = cls.logger.critical
435 stdout_log = cls.logger.info
436 stderr_log = cls.logger.info
438 if hasattr(cls, 'vpp_stdout_deque'):
439 stdout_log(single_line_delim)
440 stdout_log('VPP output to stdout while running %s:', cls.__name__)
441 stdout_log(single_line_delim)
442 vpp_output = "".join(cls.vpp_stdout_deque)
443 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
445 stdout_log('\n%s', vpp_output)
446 stdout_log(single_line_delim)
448 if hasattr(cls, 'vpp_stderr_deque'):
449 stderr_log(single_line_delim)
450 stderr_log('VPP output to stderr while running %s:', cls.__name__)
451 stderr_log(single_line_delim)
452 vpp_output = "".join(cls.vpp_stderr_deque)
453 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
455 stderr_log('\n%s', vpp_output)
456 stderr_log(single_line_delim)
459 def tearDownClass(cls):
460 """ Perform final cleanup after running all tests in this test-case """
462 cls.file_handler.close()
463 cls.reset_packet_infos()
465 debug_internal.on_tear_down_class(cls)
468 """ Show various debug prints after each test """
469 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
470 (self.__class__.__name__, self._testMethodName,
471 self._testMethodDoc))
472 if not self.vpp_dead:
473 self.logger.debug(self.vapi.cli("show trace"))
474 self.logger.info(self.vapi.ppcli("show interface"))
475 self.logger.info(self.vapi.ppcli("show hardware"))
476 self.logger.info(self.vapi.ppcli("show error"))
477 self.logger.info(self.vapi.ppcli("show run"))
478 self.registry.remove_vpp_config(self.logger)
479 # Save/Dump VPP api trace log
480 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
481 tmp_api_trace = "/tmp/%s" % api_trace
482 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
483 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
484 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
486 os.rename(tmp_api_trace, vpp_api_trace_log)
487 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
490 self.registry.unregister_all(self.logger)
493 """ Clear trace before running each test"""
494 self.reporter.send_keep_alive(self)
495 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
496 (self.__class__.__name__, self._testMethodName,
497 self._testMethodDoc))
499 raise Exception("VPP is dead when setting up the test")
500 self.sleep(.1, "during setUp")
501 self.vpp_stdout_deque.append(
502 "--- test setUp() for %s.%s(%s) starts here ---\n" %
503 (self.__class__.__name__, self._testMethodName,
504 self._testMethodDoc))
505 self.vpp_stderr_deque.append(
506 "--- test setUp() for %s.%s(%s) starts here ---\n" %
507 (self.__class__.__name__, self._testMethodName,
508 self._testMethodDoc))
509 self.vapi.cli("clear trace")
510 # store the test instance inside the test class - so that objects
511 # holding the class can access instance methods (like assertEqual)
512 type(self).test_instance = self
515 def pg_enable_capture(cls, interfaces=None):
517 Enable capture on packet-generator interfaces
519 :param interfaces: iterable interface indexes (if None,
520 use self.pg_interfaces)
523 if interfaces is None:
524 interfaces = cls.pg_interfaces
529 def register_capture(cls, cap_name):
530 """ Register a capture in the testclass """
531 # add to the list of captures with current timestamp
532 cls._captures.append((time.time(), cap_name))
533 # filter out from zombies
534 cls._zombie_captures = [(stamp, name)
535 for (stamp, name) in cls._zombie_captures
540 """ Remove any zombie captures and enable the packet generator """
541 # how long before capture is allowed to be deleted - otherwise vpp
542 # crashes - 100ms seems enough (this shouldn't be needed at all)
545 for stamp, cap_name in cls._zombie_captures:
546 wait = stamp + capture_ttl - now
548 cls.sleep(wait, "before deleting capture %s" % cap_name)
550 cls.logger.debug("Removing zombie capture %s" % cap_name)
551 cls.vapi.cli('packet-generator delete %s' % cap_name)
553 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
554 cls.vapi.cli('packet-generator enable')
555 cls._zombie_captures = cls._captures
559 def create_pg_interfaces(cls, interfaces):
561 Create packet-generator interfaces.
563 :param interfaces: iterable indexes of the interfaces.
564 :returns: List of created interfaces.
569 intf = VppPGInterface(cls, i)
570 setattr(cls, intf.name, intf)
572 cls.pg_interfaces = result
576 def create_loopback_interfaces(cls, interfaces):
578 Create loopback interfaces.
580 :param interfaces: iterable indexes of the interfaces.
581 :returns: List of created interfaces.
585 intf = VppLoInterface(cls, i)
586 setattr(cls, intf.name, intf)
588 cls.lo_interfaces = result
592 def extend_packet(packet, size, padding=' '):
594 Extend packet to given size by padding with spaces or custom padding
595 NOTE: Currently works only when Raw layer is present.
597 :param packet: packet
598 :param size: target size
599 :param padding: padding used to extend the payload
602 packet_len = len(packet) + 4
603 extend = size - packet_len
605 num = (extend / len(padding)) + 1
606 packet[Raw].load += (padding * num)[:extend]
609 def reset_packet_infos(cls):
610 """ Reset the list of packet info objects and packet counts to zero """
611 cls._packet_infos = {}
612 cls._packet_count_for_dst_if_idx = {}
615 def create_packet_info(cls, src_if, dst_if):
617 Create packet info object containing the source and destination indexes
618 and add it to the testcase's packet info list
620 :param VppInterface src_if: source interface
621 :param VppInterface dst_if: destination interface
623 :returns: _PacketInfo object
627 info.index = len(cls._packet_infos)
628 info.src = src_if.sw_if_index
629 info.dst = dst_if.sw_if_index
630 if isinstance(dst_if, VppSubInterface):
631 dst_idx = dst_if.parent.sw_if_index
633 dst_idx = dst_if.sw_if_index
634 if dst_idx in cls._packet_count_for_dst_if_idx:
635 cls._packet_count_for_dst_if_idx[dst_idx] += 1
637 cls._packet_count_for_dst_if_idx[dst_idx] = 1
638 cls._packet_infos[info.index] = info
642 def info_to_payload(info):
644 Convert _PacketInfo object to packet payload
646 :param info: _PacketInfo object
648 :returns: string containing serialized data from packet info
650 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
654 def payload_to_info(payload):
656 Convert packet payload to _PacketInfo object
658 :param payload: packet payload
660 :returns: _PacketInfo object containing de-serialized data from payload
663 numbers = payload.split()
665 info.index = int(numbers[0])
666 info.src = int(numbers[1])
667 info.dst = int(numbers[2])
668 info.ip = int(numbers[3])
669 info.proto = int(numbers[4])
672 def get_next_packet_info(self, info):
674 Iterate over the packet info list stored in the testcase
675 Start iteration with first element if info is None
676 Continue based on index in info if info is specified
678 :param info: info or None
679 :returns: next info in list or None if no more infos
684 next_index = info.index + 1
685 if next_index == len(self._packet_infos):
688 return self._packet_infos[next_index]
690 def get_next_packet_info_for_interface(self, src_index, info):
692 Search the packet info list for the next packet info with same source
695 :param src_index: source interface index to search for
696 :param info: packet info - where to start the search
697 :returns: packet info or None
701 info = self.get_next_packet_info(info)
704 if info.src == src_index:
707 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
709 Search the packet info list for the next packet info with same source
710 and destination interface indexes
712 :param src_index: source interface index to search for
713 :param dst_index: destination interface index to search for
714 :param info: packet info - where to start the search
715 :returns: packet info or None
719 info = self.get_next_packet_info_for_interface(src_index, info)
722 if info.dst == dst_index:
725 def assert_equal(self, real_value, expected_value, name_or_class=None):
726 if name_or_class is None:
727 self.assertEqual(real_value, expected_value)
730 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
731 msg = msg % (getdoc(name_or_class).strip(),
732 real_value, str(name_or_class(real_value)),
733 expected_value, str(name_or_class(expected_value)))
735 msg = "Invalid %s: %s does not match expected value %s" % (
736 name_or_class, real_value, expected_value)
738 self.assertEqual(real_value, expected_value, msg)
740 def assert_in_range(self,
748 msg = "Invalid %s: %s out of range <%s,%s>" % (
749 name, real_value, expected_min, expected_max)
750 self.assertTrue(expected_min <= real_value <= expected_max, msg)
753 def sleep(cls, timeout, remark=None):
754 if hasattr(cls, 'logger'):
755 cls.logger.debug("Starting sleep for %ss (%s)" % (timeout, remark))
759 if after - before > 2 * timeout:
760 cls.logger.error("unexpected time.sleep() result - "
761 "slept for %ss instead of ~%ss!" % (
762 after - before, timeout))
763 if hasattr(cls, 'logger'):
765 "Finished sleep (%s) - slept %ss (wanted %ss)" % (
766 remark, after - before, timeout))
768 def send_and_assert_no_replies(self, intf, pkts, remark=""):
769 self.vapi.cli("clear trace")
770 intf.add_stream(pkts)
771 self.pg_enable_capture(self.pg_interfaces)
774 for i in self.pg_interfaces:
775 i.get_capture(0, timeout=timeout)
776 i.assert_nothing_captured(remark=remark)
779 def send_and_expect(self, input, pkts, output):
780 self.vapi.cli("clear trace")
781 input.add_stream(pkts)
782 self.pg_enable_capture(self.pg_interfaces)
784 rx = output.get_capture(len(pkts))
788 class TestCasePrinter(object):
792 self.__dict__ = self._shared_state
793 if not hasattr(self, "_test_case_set"):
794 self._test_case_set = set()
796 def print_test_case_heading_if_first_time(self, case):
797 if case.__class__ not in self._test_case_set:
798 print(double_line_delim)
799 print(colorize(getdoc(case.__class__).splitlines()[0], YELLOW))
800 print(double_line_delim)
801 self._test_case_set.add(case.__class__)
804 class VppTestResult(unittest.TestResult):
806 @property result_string
807 String variable to store the test case result string.
809 List variable containing 2-tuples of TestCase instances and strings
810 holding formatted tracebacks. Each tuple represents a test which
811 raised an unexpected exception.
813 List variable containing 2-tuples of TestCase instances and strings
814 holding formatted tracebacks. Each tuple represents a test where
815 a failure was explicitly signalled using the TestCase.assert*()
819 def __init__(self, stream, descriptions, verbosity):
821 :param stream File descriptor to store where to report test results.
822 Set to the standard error stream by default.
823 :param descriptions Boolean variable to store information if to use
824 test case descriptions.
825 :param verbosity Integer variable to store required verbosity level.
827 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
829 self.descriptions = descriptions
830 self.verbosity = verbosity
831 self.result_string = None
832 self.printer = TestCasePrinter()
834 def addSuccess(self, test):
836 Record a test succeeded result
841 if hasattr(test, 'logger'):
842 test.logger.debug("--- addSuccess() %s.%s(%s) called"
843 % (test.__class__.__name__,
844 test._testMethodName,
845 test._testMethodDoc))
846 unittest.TestResult.addSuccess(self, test)
847 self.result_string = colorize("OK", GREEN)
849 def addSkip(self, test, reason):
851 Record a test skipped.
857 if hasattr(test, 'logger'):
858 test.logger.debug("--- addSkip() %s.%s(%s) called, reason is %s"
859 % (test.__class__.__name__,
860 test._testMethodName,
863 unittest.TestResult.addSkip(self, test, reason)
864 self.result_string = colorize("SKIP", YELLOW)
866 def symlink_failed(self, test):
868 if hasattr(test, 'logger'):
870 if hasattr(test, 'tempdir'):
872 failed_dir = os.getenv('VPP_TEST_FAILED_DIR')
873 link_path = '%s/%s-FAILED' % (failed_dir,
874 test.tempdir.split("/")[-1])
876 logger.debug("creating a link to the failed test")
877 logger.debug("os.symlink(%s, %s)" %
878 (test.tempdir, link_path))
879 os.symlink(test.tempdir, link_path)
880 except Exception as e:
884 def send_failure_through_pipe(self, test):
885 if hasattr(self, 'test_framework_failed_pipe'):
886 pipe = self.test_framework_failed_pipe
888 pipe.send(test.__class__)
890 def addFailure(self, test, err):
892 Record a test failed result
895 :param err: error message
898 if hasattr(test, 'logger'):
899 test.logger.debug("--- addFailure() %s.%s(%s) called, err is %s"
900 % (test.__class__.__name__,
901 test._testMethodName,
902 test._testMethodDoc, err))
903 test.logger.debug("formatted exception is:\n%s" %
904 "".join(format_exception(*err)))
905 unittest.TestResult.addFailure(self, test, err)
906 if hasattr(test, 'tempdir'):
907 self.result_string = colorize("FAIL", RED) + \
908 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
909 self.symlink_failed(test)
911 self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
913 self.send_failure_through_pipe(test)
915 def addError(self, test, err):
917 Record a test error result
920 :param err: error message
923 if hasattr(test, 'logger'):
924 test.logger.debug("--- addError() %s.%s(%s) called, err is %s"
925 % (test.__class__.__name__,
926 test._testMethodName,
927 test._testMethodDoc, err))
928 test.logger.debug("formatted exception is:\n%s" %
929 "".join(format_exception(*err)))
930 unittest.TestResult.addError(self, test, err)
931 if hasattr(test, 'tempdir'):
932 self.result_string = colorize("ERROR", RED) + \
933 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
934 self.symlink_failed(test)
936 self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
938 self.send_failure_through_pipe(test)
940 def getDescription(self, test):
945 :returns: test description
948 # TODO: if none print warning not raise exception
949 short_description = test.shortDescription()
950 if self.descriptions and short_description:
951 return short_description
955 def startTest(self, test):
962 self.printer.print_test_case_heading_if_first_time(test)
963 unittest.TestResult.startTest(self, test)
964 if self.verbosity > 0:
966 "Starting " + self.getDescription(test) + " ...")
967 self.stream.writeln(single_line_delim)
969 def stopTest(self, test):
976 unittest.TestResult.stopTest(self, test)
977 if self.verbosity > 0:
978 self.stream.writeln(single_line_delim)
979 self.stream.writeln("%-73s%s" % (self.getDescription(test),
981 self.stream.writeln(single_line_delim)
983 self.stream.writeln("%-73s%s" % (self.getDescription(test),
986 def printErrors(self):
988 Print errors from running the test case
990 self.stream.writeln()
991 self.printErrorList('ERROR', self.errors)
992 self.printErrorList('FAIL', self.failures)
994 def printErrorList(self, flavour, errors):
996 Print error list to the output stream together with error type
997 and test case description.
999 :param flavour: error type
1000 :param errors: iterable errors
1003 for test, err in errors:
1004 self.stream.writeln(double_line_delim)
1005 self.stream.writeln("%s: %s" %
1006 (flavour, self.getDescription(test)))
1007 self.stream.writeln(single_line_delim)
1008 self.stream.writeln("%s" % err)
1011 class Filter_by_test_option:
1012 def __init__(self, filter_file_name, filter_class_name, filter_func_name):
1013 self.filter_file_name = filter_file_name
1014 self.filter_class_name = filter_class_name
1015 self.filter_func_name = filter_func_name
1017 def __call__(self, file_name, class_name, func_name):
1018 if self.filter_file_name and file_name != self.filter_file_name:
1020 if self.filter_class_name and class_name != self.filter_class_name:
1022 if self.filter_func_name and func_name != self.filter_func_name:
1027 class VppTestRunner(unittest.TextTestRunner):
1029 A basic test runner implementation which prints results to standard error.
1032 def resultclass(self):
1033 """Class maintaining the results of the tests"""
1034 return VppTestResult
1036 def __init__(self, keep_alive_pipe=None, failed_pipe=None,
1037 stream=sys.stderr, descriptions=True,
1038 verbosity=1, failfast=False, buffer=False, resultclass=None):
1039 # ignore stream setting here, use hard-coded stdout to be in sync
1040 # with prints from VppTestCase methods ...
1041 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1042 verbosity, failfast, buffer,
1044 reporter = KeepAliveReporter()
1045 reporter.pipe = keep_alive_pipe
1046 # this is super-ugly, but very simple to implement and works as long
1047 # as we run only one test at the same time
1048 VppTestResult.test_framework_failed_pipe = failed_pipe
1050 test_option = "TEST"
1052 def parse_test_option(self):
1054 f = os.getenv(self.test_option)
1057 filter_file_name = None
1058 filter_class_name = None
1059 filter_func_name = None
1062 parts = f.split('.')
1064 raise Exception("Unrecognized %s option: %s" %
1065 (self.test_option, f))
1067 if parts[2] not in ('*', ''):
1068 filter_func_name = parts[2]
1069 if parts[1] not in ('*', ''):
1070 filter_class_name = parts[1]
1071 if parts[0] not in ('*', ''):
1072 if parts[0].startswith('test_'):
1073 filter_file_name = parts[0]
1075 filter_file_name = 'test_%s' % parts[0]
1077 if f.startswith('test_'):
1078 filter_file_name = f
1080 filter_file_name = 'test_%s' % f
1081 return filter_file_name, filter_class_name, filter_func_name
1084 def filter_tests(tests, filter_cb):
1085 result = unittest.suite.TestSuite()
1087 if isinstance(t, unittest.suite.TestSuite):
1088 # this is a bunch of tests, recursively filter...
1089 x = filter_tests(t, filter_cb)
1090 if x.countTestCases() > 0:
1092 elif isinstance(t, unittest.TestCase):
1093 # this is a single test
1094 parts = t.id().split('.')
1095 # t.id() for common cases like this:
1096 # test_classifier.TestClassifier.test_acl_ip
1097 # apply filtering only if it is so
1099 if not filter_cb(parts[0], parts[1], parts[2]):
1103 # unexpected object, don't touch it
1107 def run(self, test):
1114 faulthandler.enable() # emit stack trace to stderr if killed by signal
1115 print("Running tests using custom test runner") # debug message
1116 filter_file, filter_class, filter_func = self.parse_test_option()
1117 print("Active filters: file=%s, class=%s, function=%s" % (
1118 filter_file, filter_class, filter_func))
1119 filter_cb = Filter_by_test_option(
1120 filter_file, filter_class, filter_func)
1121 filtered = self.filter_tests(test, filter_cb)
1122 print("%s out of %s tests match specified filters" % (
1123 filtered.countTestCases(), test.countTestCases()))
1124 if not running_extended_tests():
1125 print("Not running extended tests (some tests will be skipped)")
1126 return super(VppTestRunner, self).run(filtered)
1129 class Worker(Thread):
1130 def __init__(self, args, logger, env={}):
1131 self.logger = logger
1134 self.env = copy.deepcopy(env)
1135 super(Worker, self).__init__()
1138 executable = self.args[0]
1139 self.logger.debug("Running executable w/args `%s'" % self.args)
1140 env = os.environ.copy()
1141 env.update(self.env)
1142 env["CK_LOG_FILE_NAME"] = "-"
1143 self.process = subprocess.Popen(
1144 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1145 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1146 out, err = self.process.communicate()
1147 self.logger.debug("Finished running `%s'" % executable)
1148 self.logger.info("Return code is `%s'" % self.process.returncode)
1149 self.logger.info(single_line_delim)
1150 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1151 self.logger.info(single_line_delim)
1152 self.logger.info(out)
1153 self.logger.info(single_line_delim)
1154 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1155 self.logger.info(single_line_delim)
1156 self.logger.info(err)
1157 self.logger.info(single_line_delim)
1158 self.result = self.process.returncode