3 from __future__ import print_function
14 from collections import deque
15 from threading import Thread, Event
16 from inspect import getdoc, isclass
17 from traceback import format_exception
18 from logging import FileHandler, DEBUG, Formatter
19 from scapy.packet import Raw
20 from hook import StepHook, PollHook
21 from vpp_pg_interface import VppPGInterface
22 from vpp_sub_interface import VppSubInterface
23 from vpp_lo_interface import VppLoInterface
24 from vpp_papi_provider import VppPapiProvider
25 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
27 from vpp_object import VppObjectRegistry
28 if os.name == 'posix' and sys.version_info[0] < 3:
29 # using subprocess32 is recommended by python official documentation
30 # @ https://docs.python.org/2/library/subprocess.html
31 import subprocess32 as subprocess
35 debug_framework = False
36 if os.getenv('TEST_DEBUG', "0") == "1":
37 debug_framework = True
42 Test framework module.
44 The module provides a set of tools for constructing and running tests and
45 representing the results.
49 class _PacketInfo(object):
50 """Private class to create packet info object.
52 Help process information about the next packet.
53 Set variables to default values.
55 #: Store the index of the packet.
57 #: Store the index of the source packet generator interface of the packet.
59 #: Store the index of the destination packet generator interface
62 #: Store expected ip version
64 #: Store expected upper protocol
66 #: Store the copy of the former packet.
69 def __eq__(self, other):
70 index = self.index == other.index
71 src = self.src == other.src
72 dst = self.dst == other.dst
73 data = self.data == other.data
74 return index and src and dst and data
77 def pump_output(testclass):
78 """ pump output from vpp stdout/stderr to proper queues """
81 while not testclass.pump_thread_stop_flag.wait(0):
82 readable = select.select([testclass.vpp.stdout.fileno(),
83 testclass.vpp.stderr.fileno(),
84 testclass.pump_thread_wakeup_pipe[0]],
86 if testclass.vpp.stdout.fileno() in readable:
87 read = os.read(testclass.vpp.stdout.fileno(), 102400)
89 split = read.splitlines(True)
90 if len(stdout_fragment) > 0:
91 split[0] = "%s%s" % (stdout_fragment, split[0])
92 if len(split) > 0 and split[-1].endswith("\n"):
96 stdout_fragment = split[-1]
97 testclass.vpp_stdout_deque.extend(split[:limit])
98 if not testclass.cache_vpp_output:
99 for line in split[:limit]:
100 testclass.logger.debug(
101 "VPP STDOUT: %s" % line.rstrip("\n"))
102 if testclass.vpp.stderr.fileno() in readable:
103 read = os.read(testclass.vpp.stderr.fileno(), 102400)
105 split = read.splitlines(True)
106 if len(stderr_fragment) > 0:
107 split[0] = "%s%s" % (stderr_fragment, split[0])
108 if len(split) > 0 and split[-1].endswith("\n"):
112 stderr_fragment = split[-1]
113 testclass.vpp_stderr_deque.extend(split[:limit])
114 if not testclass.cache_vpp_output:
115 for line in split[:limit]:
116 testclass.logger.debug(
117 "VPP STDERR: %s" % line.rstrip("\n"))
118 # ignoring the dummy pipe here intentionally - the flag will take care
119 # of properly terminating the loop
122 def running_extended_tests():
124 s = os.getenv("EXTENDED_TESTS")
125 return True if s.lower() in ("y", "yes", "1") else False
131 def running_on_centos():
133 os_id = os.getenv("OS_ID")
134 return True if "centos" in os_id.lower() else False
140 class KeepAliveReporter(object):
142 Singleton object which reports test start to parent process
147 self.__dict__ = self._shared_state
154 def pipe(self, pipe):
155 if hasattr(self, '_pipe'):
156 raise Exception("Internal error - pipe should only be set once.")
159 def send_keep_alive(self, test):
161 Write current test tmpdir & desc to keep-alive pipe to signal liveness
163 if self.pipe is None:
164 # if not running forked..
170 desc = test.shortDescription()
174 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
177 class VppTestCase(unittest.TestCase):
178 """This subclass is a base class for VPP test cases that are implemented as
179 classes. It provides methods to create and run test case.
183 def packet_infos(self):
184 """List of packet infos"""
185 return self._packet_infos
188 def get_packet_count_for_if_idx(cls, dst_if_index):
189 """Get the number of packet info for specified destination if index"""
190 if dst_if_index in cls._packet_count_for_dst_if_idx:
191 return cls._packet_count_for_dst_if_idx[dst_if_index]
197 """Return the instance of this testcase"""
198 return cls.test_instance
201 def set_debug_flags(cls, d):
202 cls.debug_core = False
203 cls.debug_gdb = False
204 cls.debug_gdbserver = False
209 cls.debug_core = True
212 elif dl == "gdbserver":
213 cls.debug_gdbserver = True
215 raise Exception("Unrecognized DEBUG option: '%s'" % d)
218 def setUpConstants(cls):
219 """ Set-up the test case class based on environment variables """
221 s = os.getenv("STEP")
222 cls.step = True if s.lower() in ("y", "yes", "1") else False
226 d = os.getenv("DEBUG")
230 c = os.getenv("CACHE_OUTPUT", "1")
231 cls.cache_vpp_output = \
232 False if c.lower() in ("n", "no", "0") else True
234 cls.cache_vpp_output = True
235 cls.set_debug_flags(d)
236 cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
237 cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
238 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
240 if cls.plugin_path is not None:
241 if cls.extern_plugin_path is not None:
242 plugin_path = "%s:%s" % (
243 cls.plugin_path, cls.extern_plugin_path)
245 plugin_path = cls.plugin_path
246 elif cls.extern_plugin_path is not None:
247 plugin_path = cls.extern_plugin_path
249 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
250 debug_cli = "cli-listen localhost:5002"
253 size = os.getenv("COREDUMP_SIZE")
255 coredump_size = "coredump-size %s" % size
258 if coredump_size is None:
259 coredump_size = "coredump-size unlimited"
260 cls.vpp_cmdline = [cls.vpp_bin, "unix",
261 "{", "nodaemon", debug_cli, "full-coredump",
262 coredump_size, "}", "api-trace", "{", "on", "}",
263 "api-segment", "{", "prefix", cls.shm_prefix, "}",
264 "plugins", "{", "plugin", "dpdk_plugin.so", "{",
265 "disable", "}", "}", ]
266 if plugin_path is not None:
267 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
268 cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
271 def wait_for_enter(cls):
272 if cls.debug_gdbserver:
273 print(double_line_delim)
274 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
276 print(double_line_delim)
277 print("Spawned VPP with PID: %d" % cls.vpp.pid)
279 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
281 print(single_line_delim)
282 print("You can debug the VPP using e.g.:")
283 if cls.debug_gdbserver:
284 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
285 print("Now is the time to attach a gdb by running the above "
286 "command, set up breakpoints etc. and then resume VPP from "
287 "within gdb by issuing the 'continue' command")
289 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
290 print("Now is the time to attach a gdb by running the above "
291 "command and set up breakpoints etc.")
292 print(single_line_delim)
293 raw_input("Press ENTER to continue running the testcase...")
297 cmdline = cls.vpp_cmdline
299 if cls.debug_gdbserver:
300 gdbserver = '/usr/bin/gdbserver'
301 if not os.path.isfile(gdbserver) or \
302 not os.access(gdbserver, os.X_OK):
303 raise Exception("gdbserver binary '%s' does not exist or is "
304 "not executable" % gdbserver)
306 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
307 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
310 cls.vpp = subprocess.Popen(cmdline,
311 stdout=subprocess.PIPE,
312 stderr=subprocess.PIPE,
314 except Exception as e:
315 cls.logger.critical("Couldn't start vpp: %s" % e)
323 Perform class setup before running the testcase
324 Remove shared memory files, start vpp and connect the vpp-api
326 gc.collect() # run garbage collection first
328 cls.logger = getLogger(cls.__name__)
329 cls.tempdir = tempfile.mkdtemp(
330 prefix='vpp-unittest-%s-' % cls.__name__)
331 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
332 cls.file_handler.setFormatter(
333 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
335 cls.file_handler.setLevel(DEBUG)
336 cls.logger.addHandler(cls.file_handler)
337 cls.shm_prefix = cls.tempdir.split("/")[-1]
338 os.chdir(cls.tempdir)
339 cls.logger.info("Temporary dir is %s, shm prefix is %s",
340 cls.tempdir, cls.shm_prefix)
342 cls.reset_packet_infos()
344 cls._zombie_captures = []
347 cls.registry = VppObjectRegistry()
348 cls.vpp_startup_failed = False
349 cls.reporter = KeepAliveReporter()
350 # need to catch exceptions here because if we raise, then the cleanup
351 # doesn't get called and we might end with a zombie vpp
354 cls.reporter.send_keep_alive(cls)
355 cls.vpp_stdout_deque = deque()
356 cls.vpp_stderr_deque = deque()
357 cls.pump_thread_stop_flag = Event()
358 cls.pump_thread_wakeup_pipe = os.pipe()
359 cls.pump_thread = Thread(target=pump_output, args=(cls,))
360 cls.pump_thread.daemon = True
361 cls.pump_thread.start()
362 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls)
367 cls.vapi.register_hook(hook)
368 cls.sleep(0.1, "after vpp startup, before initial poll")
372 cls.vpp_startup_failed = True
374 "VPP died shortly after startup, check the"
375 " output to standard error for possible cause")
381 cls.vapi.disconnect()
384 if cls.debug_gdbserver:
385 print(colorize("You're running VPP inside gdbserver but "
386 "VPP-API connection failed, did you forget "
387 "to 'continue' VPP from within gdb?", RED))
390 t, v, tb = sys.exc_info()
400 Disconnect vpp-api, kill vpp and cleanup shared memory files
402 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
404 if cls.vpp.returncode is None:
405 print(double_line_delim)
406 print("VPP or GDB server is still running")
407 print(single_line_delim)
408 raw_input("When done debugging, press ENTER to kill the "
409 "process and finish running the testcase...")
411 os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
412 cls.pump_thread_stop_flag.set()
413 if hasattr(cls, 'pump_thread'):
414 cls.logger.debug("Waiting for pump thread to stop")
415 cls.pump_thread.join()
416 if hasattr(cls, 'vpp_stderr_reader_thread'):
417 cls.logger.debug("Waiting for stdderr pump to stop")
418 cls.vpp_stderr_reader_thread.join()
420 if hasattr(cls, 'vpp'):
421 if hasattr(cls, 'vapi'):
422 cls.vapi.disconnect()
425 if cls.vpp.returncode is None:
426 cls.logger.debug("Sending TERM to vpp")
428 cls.logger.debug("Waiting for vpp to die")
429 cls.vpp.communicate()
432 if cls.vpp_startup_failed:
433 stdout_log = cls.logger.info
434 stderr_log = cls.logger.critical
436 stdout_log = cls.logger.info
437 stderr_log = cls.logger.info
439 if hasattr(cls, 'vpp_stdout_deque'):
440 stdout_log(single_line_delim)
441 stdout_log('VPP output to stdout while running %s:', cls.__name__)
442 stdout_log(single_line_delim)
443 vpp_output = "".join(cls.vpp_stdout_deque)
444 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
446 stdout_log('\n%s', vpp_output)
447 stdout_log(single_line_delim)
449 if hasattr(cls, 'vpp_stderr_deque'):
450 stderr_log(single_line_delim)
451 stderr_log('VPP output to stderr while running %s:', cls.__name__)
452 stderr_log(single_line_delim)
453 vpp_output = "".join(cls.vpp_stderr_deque)
454 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
456 stderr_log('\n%s', vpp_output)
457 stderr_log(single_line_delim)
460 def tearDownClass(cls):
461 """ Perform final cleanup after running all tests in this test-case """
463 cls.file_handler.close()
464 cls.reset_packet_infos()
466 debug_internal.on_tear_down_class(cls)
469 """ Show various debug prints after each test """
470 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
471 (self.__class__.__name__, self._testMethodName,
472 self._testMethodDoc))
473 if not self.vpp_dead:
474 self.logger.debug(self.vapi.cli("show trace"))
475 self.logger.info(self.vapi.ppcli("show interface"))
476 self.logger.info(self.vapi.ppcli("show hardware"))
477 self.logger.info(self.vapi.ppcli("show error"))
478 self.logger.info(self.vapi.ppcli("show run"))
479 self.registry.remove_vpp_config(self.logger)
480 # Save/Dump VPP api trace log
481 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
482 tmp_api_trace = "/tmp/%s" % api_trace
483 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
484 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
485 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
487 os.rename(tmp_api_trace, vpp_api_trace_log)
488 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
491 self.registry.unregister_all(self.logger)
494 """ Clear trace before running each test"""
495 self.reporter.send_keep_alive(self)
496 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
497 (self.__class__.__name__, self._testMethodName,
498 self._testMethodDoc))
500 raise Exception("VPP is dead when setting up the test")
501 self.sleep(.1, "during setUp")
502 self.vpp_stdout_deque.append(
503 "--- test setUp() for %s.%s(%s) starts here ---\n" %
504 (self.__class__.__name__, self._testMethodName,
505 self._testMethodDoc))
506 self.vpp_stderr_deque.append(
507 "--- test setUp() for %s.%s(%s) starts here ---\n" %
508 (self.__class__.__name__, self._testMethodName,
509 self._testMethodDoc))
510 self.vapi.cli("clear trace")
511 # store the test instance inside the test class - so that objects
512 # holding the class can access instance methods (like assertEqual)
513 type(self).test_instance = self
516 def pg_enable_capture(cls, interfaces=None):
518 Enable capture on packet-generator interfaces
520 :param interfaces: iterable interface indexes (if None,
521 use self.pg_interfaces)
524 if interfaces is None:
525 interfaces = cls.pg_interfaces
530 def register_capture(cls, cap_name):
531 """ Register a capture in the testclass """
532 # add to the list of captures with current timestamp
533 cls._captures.append((time.time(), cap_name))
534 # filter out from zombies
535 cls._zombie_captures = [(stamp, name)
536 for (stamp, name) in cls._zombie_captures
541 """ Remove any zombie captures and enable the packet generator """
542 # how long before capture is allowed to be deleted - otherwise vpp
543 # crashes - 100ms seems enough (this shouldn't be needed at all)
546 for stamp, cap_name in cls._zombie_captures:
547 wait = stamp + capture_ttl - now
549 cls.sleep(wait, "before deleting capture %s" % cap_name)
551 cls.logger.debug("Removing zombie capture %s" % cap_name)
552 cls.vapi.cli('packet-generator delete %s' % cap_name)
554 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
555 cls.vapi.cli('packet-generator enable')
556 cls._zombie_captures = cls._captures
560 def create_pg_interfaces(cls, interfaces):
562 Create packet-generator interfaces.
564 :param interfaces: iterable indexes of the interfaces.
565 :returns: List of created interfaces.
570 intf = VppPGInterface(cls, i)
571 setattr(cls, intf.name, intf)
573 cls.pg_interfaces = result
577 def create_loopback_interfaces(cls, interfaces):
579 Create loopback interfaces.
581 :param interfaces: iterable indexes of the interfaces.
582 :returns: List of created interfaces.
586 intf = VppLoInterface(cls, i)
587 setattr(cls, intf.name, intf)
589 cls.lo_interfaces = result
593 def extend_packet(packet, size, padding=' '):
595 Extend packet to given size by padding with spaces or custom padding
596 NOTE: Currently works only when Raw layer is present.
598 :param packet: packet
599 :param size: target size
600 :param padding: padding used to extend the payload
603 packet_len = len(packet) + 4
604 extend = size - packet_len
606 num = (extend / len(padding)) + 1
607 packet[Raw].load += (padding * num)[:extend]
610 def reset_packet_infos(cls):
611 """ Reset the list of packet info objects and packet counts to zero """
612 cls._packet_infos = {}
613 cls._packet_count_for_dst_if_idx = {}
616 def create_packet_info(cls, src_if, dst_if):
618 Create packet info object containing the source and destination indexes
619 and add it to the testcase's packet info list
621 :param VppInterface src_if: source interface
622 :param VppInterface dst_if: destination interface
624 :returns: _PacketInfo object
628 info.index = len(cls._packet_infos)
629 info.src = src_if.sw_if_index
630 info.dst = dst_if.sw_if_index
631 if isinstance(dst_if, VppSubInterface):
632 dst_idx = dst_if.parent.sw_if_index
634 dst_idx = dst_if.sw_if_index
635 if dst_idx in cls._packet_count_for_dst_if_idx:
636 cls._packet_count_for_dst_if_idx[dst_idx] += 1
638 cls._packet_count_for_dst_if_idx[dst_idx] = 1
639 cls._packet_infos[info.index] = info
643 def info_to_payload(info):
645 Convert _PacketInfo object to packet payload
647 :param info: _PacketInfo object
649 :returns: string containing serialized data from packet info
651 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
655 def payload_to_info(payload):
657 Convert packet payload to _PacketInfo object
659 :param payload: packet payload
661 :returns: _PacketInfo object containing de-serialized data from payload
664 numbers = payload.split()
666 info.index = int(numbers[0])
667 info.src = int(numbers[1])
668 info.dst = int(numbers[2])
669 info.ip = int(numbers[3])
670 info.proto = int(numbers[4])
673 def get_next_packet_info(self, info):
675 Iterate over the packet info list stored in the testcase
676 Start iteration with first element if info is None
677 Continue based on index in info if info is specified
679 :param info: info or None
680 :returns: next info in list or None if no more infos
685 next_index = info.index + 1
686 if next_index == len(self._packet_infos):
689 return self._packet_infos[next_index]
691 def get_next_packet_info_for_interface(self, src_index, info):
693 Search the packet info list for the next packet info with same source
696 :param src_index: source interface index to search for
697 :param info: packet info - where to start the search
698 :returns: packet info or None
702 info = self.get_next_packet_info(info)
705 if info.src == src_index:
708 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
710 Search the packet info list for the next packet info with same source
711 and destination interface indexes
713 :param src_index: source interface index to search for
714 :param dst_index: destination interface index to search for
715 :param info: packet info - where to start the search
716 :returns: packet info or None
720 info = self.get_next_packet_info_for_interface(src_index, info)
723 if info.dst == dst_index:
726 def assert_equal(self, real_value, expected_value, name_or_class=None):
727 if name_or_class is None:
728 self.assertEqual(real_value, expected_value)
731 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
732 msg = msg % (getdoc(name_or_class).strip(),
733 real_value, str(name_or_class(real_value)),
734 expected_value, str(name_or_class(expected_value)))
736 msg = "Invalid %s: %s does not match expected value %s" % (
737 name_or_class, real_value, expected_value)
739 self.assertEqual(real_value, expected_value, msg)
741 def assert_in_range(self,
749 msg = "Invalid %s: %s out of range <%s,%s>" % (
750 name, real_value, expected_min, expected_max)
751 self.assertTrue(expected_min <= real_value <= expected_max, msg)
754 def sleep(cls, timeout, remark=None):
755 if hasattr(cls, 'logger'):
756 cls.logger.debug("Starting sleep for %ss (%s)" % (timeout, remark))
760 if after - before > 2 * timeout:
761 cls.logger.error("unexpected time.sleep() result - "
762 "slept for %ss instead of ~%ss!" % (
763 after - before, timeout))
764 if hasattr(cls, 'logger'):
766 "Finished sleep (%s) - slept %ss (wanted %ss)" % (
767 remark, after - before, timeout))
769 def send_and_assert_no_replies(self, intf, pkts, remark=""):
770 self.vapi.cli("clear trace")
771 intf.add_stream(pkts)
772 self.pg_enable_capture(self.pg_interfaces)
775 for i in self.pg_interfaces:
776 i.get_capture(0, timeout=timeout)
777 i.assert_nothing_captured(remark=remark)
780 def send_and_expect(self, input, pkts, output):
781 self.vapi.cli("clear trace")
782 input.add_stream(pkts)
783 self.pg_enable_capture(self.pg_interfaces)
785 rx = output.get_capture(len(pkts))
789 class TestCasePrinter(object):
793 self.__dict__ = self._shared_state
794 if not hasattr(self, "_test_case_set"):
795 self._test_case_set = set()
797 def print_test_case_heading_if_first_time(self, case):
798 if case.__class__ not in self._test_case_set:
799 print(double_line_delim)
800 print(colorize(getdoc(case.__class__).splitlines()[0], YELLOW))
801 print(double_line_delim)
802 self._test_case_set.add(case.__class__)
805 class VppTestResult(unittest.TestResult):
807 @property result_string
808 String variable to store the test case result string.
810 List variable containing 2-tuples of TestCase instances and strings
811 holding formatted tracebacks. Each tuple represents a test which
812 raised an unexpected exception.
814 List variable containing 2-tuples of TestCase instances and strings
815 holding formatted tracebacks. Each tuple represents a test where
816 a failure was explicitly signalled using the TestCase.assert*()
820 def __init__(self, stream, descriptions, verbosity):
822 :param stream File descriptor to store where to report test results.
823 Set to the standard error stream by default.
824 :param descriptions Boolean variable to store information if to use
825 test case descriptions.
826 :param verbosity Integer variable to store required verbosity level.
828 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
830 self.descriptions = descriptions
831 self.verbosity = verbosity
832 self.result_string = None
833 self.printer = TestCasePrinter()
835 def addSuccess(self, test):
837 Record a test succeeded result
842 if hasattr(test, 'logger'):
843 test.logger.debug("--- addSuccess() %s.%s(%s) called"
844 % (test.__class__.__name__,
845 test._testMethodName,
846 test._testMethodDoc))
847 unittest.TestResult.addSuccess(self, test)
848 self.result_string = colorize("OK", GREEN)
850 def addSkip(self, test, reason):
852 Record a test skipped.
858 if hasattr(test, 'logger'):
859 test.logger.debug("--- addSkip() %s.%s(%s) called, reason is %s"
860 % (test.__class__.__name__,
861 test._testMethodName,
864 unittest.TestResult.addSkip(self, test, reason)
865 self.result_string = colorize("SKIP", YELLOW)
867 def symlink_failed(self, test):
869 if hasattr(test, 'logger'):
871 if hasattr(test, 'tempdir'):
873 failed_dir = os.getenv('VPP_TEST_FAILED_DIR')
874 link_path = '%s/%s-FAILED' % (failed_dir,
875 test.tempdir.split("/")[-1])
877 logger.debug("creating a link to the failed test")
878 logger.debug("os.symlink(%s, %s)" %
879 (test.tempdir, link_path))
880 os.symlink(test.tempdir, link_path)
881 except Exception as e:
885 def send_failure_through_pipe(self, test):
886 if hasattr(self, 'test_framework_failed_pipe'):
887 pipe = self.test_framework_failed_pipe
889 pipe.send(test.__class__)
891 def addFailure(self, test, err):
893 Record a test failed result
896 :param err: error message
899 if hasattr(test, 'logger'):
900 test.logger.debug("--- addFailure() %s.%s(%s) called, err is %s"
901 % (test.__class__.__name__,
902 test._testMethodName,
903 test._testMethodDoc, err))
904 test.logger.debug("formatted exception is:\n%s" %
905 "".join(format_exception(*err)))
906 unittest.TestResult.addFailure(self, test, err)
907 if hasattr(test, 'tempdir'):
908 self.result_string = colorize("FAIL", RED) + \
909 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
910 self.symlink_failed(test)
912 self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
914 self.send_failure_through_pipe(test)
916 def addError(self, test, err):
918 Record a test error result
921 :param err: error message
924 if hasattr(test, 'logger'):
925 test.logger.debug("--- addError() %s.%s(%s) called, err is %s"
926 % (test.__class__.__name__,
927 test._testMethodName,
928 test._testMethodDoc, err))
929 test.logger.debug("formatted exception is:\n%s" %
930 "".join(format_exception(*err)))
931 unittest.TestResult.addError(self, test, err)
932 if hasattr(test, 'tempdir'):
933 self.result_string = colorize("ERROR", RED) + \
934 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
935 self.symlink_failed(test)
937 self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
939 self.send_failure_through_pipe(test)
941 def getDescription(self, test):
946 :returns: test description
949 # TODO: if none print warning not raise exception
950 short_description = test.shortDescription()
951 if self.descriptions and short_description:
952 return short_description
956 def startTest(self, test):
963 self.printer.print_test_case_heading_if_first_time(test)
964 unittest.TestResult.startTest(self, test)
965 if self.verbosity > 0:
967 "Starting " + self.getDescription(test) + " ...")
968 self.stream.writeln(single_line_delim)
970 def stopTest(self, test):
977 unittest.TestResult.stopTest(self, test)
978 if self.verbosity > 0:
979 self.stream.writeln(single_line_delim)
980 self.stream.writeln("%-73s%s" % (self.getDescription(test),
982 self.stream.writeln(single_line_delim)
984 self.stream.writeln("%-73s%s" % (self.getDescription(test),
987 def printErrors(self):
989 Print errors from running the test case
991 self.stream.writeln()
992 self.printErrorList('ERROR', self.errors)
993 self.printErrorList('FAIL', self.failures)
995 def printErrorList(self, flavour, errors):
997 Print error list to the output stream together with error type
998 and test case description.
1000 :param flavour: error type
1001 :param errors: iterable errors
1004 for test, err in errors:
1005 self.stream.writeln(double_line_delim)
1006 self.stream.writeln("%s: %s" %
1007 (flavour, self.getDescription(test)))
1008 self.stream.writeln(single_line_delim)
1009 self.stream.writeln("%s" % err)
1012 class Filter_by_test_option:
1013 def __init__(self, filter_file_name, filter_class_name, filter_func_name):
1014 self.filter_file_name = filter_file_name
1015 self.filter_class_name = filter_class_name
1016 self.filter_func_name = filter_func_name
1018 def __call__(self, file_name, class_name, func_name):
1019 if self.filter_file_name and file_name != self.filter_file_name:
1021 if self.filter_class_name and class_name != self.filter_class_name:
1023 if self.filter_func_name and func_name != self.filter_func_name:
1028 class VppTestRunner(unittest.TextTestRunner):
1030 A basic test runner implementation which prints results to standard error.
1033 def resultclass(self):
1034 """Class maintaining the results of the tests"""
1035 return VppTestResult
1037 def __init__(self, keep_alive_pipe=None, failed_pipe=None,
1038 stream=sys.stderr, descriptions=True,
1039 verbosity=1, failfast=False, buffer=False, resultclass=None):
1040 # ignore stream setting here, use hard-coded stdout to be in sync
1041 # with prints from VppTestCase methods ...
1042 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1043 verbosity, failfast, buffer,
1045 reporter = KeepAliveReporter()
1046 reporter.pipe = keep_alive_pipe
1047 # this is super-ugly, but very simple to implement and works as long
1048 # as we run only one test at the same time
1049 VppTestResult.test_framework_failed_pipe = failed_pipe
1051 test_option = "TEST"
1053 def parse_test_option(self):
1055 f = os.getenv(self.test_option)
1058 filter_file_name = None
1059 filter_class_name = None
1060 filter_func_name = None
1063 parts = f.split('.')
1065 raise Exception("Unrecognized %s option: %s" %
1066 (self.test_option, f))
1068 if parts[2] not in ('*', ''):
1069 filter_func_name = parts[2]
1070 if parts[1] not in ('*', ''):
1071 filter_class_name = parts[1]
1072 if parts[0] not in ('*', ''):
1073 if parts[0].startswith('test_'):
1074 filter_file_name = parts[0]
1076 filter_file_name = 'test_%s' % parts[0]
1078 if f.startswith('test_'):
1079 filter_file_name = f
1081 filter_file_name = 'test_%s' % f
1082 return filter_file_name, filter_class_name, filter_func_name
1085 def filter_tests(tests, filter_cb):
1086 result = unittest.suite.TestSuite()
1088 if isinstance(t, unittest.suite.TestSuite):
1089 # this is a bunch of tests, recursively filter...
1090 x = VppTestRunner.filter_tests(t, filter_cb)
1091 if x.countTestCases() > 0:
1093 elif isinstance(t, unittest.TestCase):
1094 # this is a single test
1095 parts = t.id().split('.')
1096 # t.id() for common cases like this:
1097 # test_classifier.TestClassifier.test_acl_ip
1098 # apply filtering only if it is so
1100 if not filter_cb(parts[0], parts[1], parts[2]):
1104 # unexpected object, don't touch it
1108 def run(self, test):
1115 faulthandler.enable() # emit stack trace to stderr if killed by signal
1116 print("Running tests using custom test runner") # debug message
1117 filter_file, filter_class, filter_func = self.parse_test_option()
1118 print("Active filters: file=%s, class=%s, function=%s" % (
1119 filter_file, filter_class, filter_func))
1120 filter_cb = Filter_by_test_option(
1121 filter_file, filter_class, filter_func)
1122 filtered = self.filter_tests(test, filter_cb)
1123 print("%s out of %s tests match specified filters" % (
1124 filtered.countTestCases(), test.countTestCases()))
1125 if not running_extended_tests():
1126 print("Not running extended tests (some tests will be skipped)")
1127 return super(VppTestRunner, self).run(filtered)
1130 class Worker(Thread):
1131 def __init__(self, args, logger, env={}):
1132 self.logger = logger
1135 self.env = copy.deepcopy(env)
1136 super(Worker, self).__init__()
1139 executable = self.args[0]
1140 self.logger.debug("Running executable w/args `%s'" % self.args)
1141 env = os.environ.copy()
1142 env.update(self.env)
1143 env["CK_LOG_FILE_NAME"] = "-"
1144 self.process = subprocess.Popen(
1145 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1146 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1147 out, err = self.process.communicate()
1148 self.logger.debug("Finished running `%s'" % executable)
1149 self.logger.info("Return code is `%s'" % self.process.returncode)
1150 self.logger.info(single_line_delim)
1151 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1152 self.logger.info(single_line_delim)
1153 self.logger.info(out)
1154 self.logger.info(single_line_delim)
1155 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1156 self.logger.info(single_line_delim)
1157 self.logger.info(err)
1158 self.logger.info(single_line_delim)
1159 self.result = self.process.returncode