3 from __future__ import print_function
14 from collections import deque
15 from threading import Thread, Event
16 from inspect import getdoc, isclass
17 from traceback import format_exception
18 from logging import FileHandler, DEBUG, Formatter
19 from scapy.packet import Raw
20 from hook import StepHook, PollHook
21 from vpp_pg_interface import VppPGInterface
22 from vpp_sub_interface import VppSubInterface
23 from vpp_lo_interface import VppLoInterface
24 from vpp_papi_provider import VppPapiProvider
25 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
27 from vpp_object import VppObjectRegistry
28 if os.name == 'posix' and sys.version_info[0] < 3:
29 # using subprocess32 is recommended by python official documentation
30 # @ https://docs.python.org/2/library/subprocess.html
31 import subprocess32 as subprocess
35 debug_framework = False
36 if os.getenv('TEST_DEBUG', "0") == "1":
37 debug_framework = True
42 Test framework module.
44 The module provides a set of tools for constructing and running tests and
45 representing the results.
49 class _PacketInfo(object):
50 """Private class to create packet info object.
52 Help process information about the next packet.
53 Set variables to default values.
55 #: Store the index of the packet.
57 #: Store the index of the source packet generator interface of the packet.
59 #: Store the index of the destination packet generator interface
62 #: Store expected ip version
64 #: Store expected upper protocol
66 #: Store the copy of the former packet.
69 def __eq__(self, other):
70 index = self.index == other.index
71 src = self.src == other.src
72 dst = self.dst == other.dst
73 data = self.data == other.data
74 return index and src and dst and data
77 def pump_output(testclass):
78 """ pump output from vpp stdout/stderr to proper queues """
81 while not testclass.pump_thread_stop_flag.wait(0):
82 readable = select.select([testclass.vpp.stdout.fileno(),
83 testclass.vpp.stderr.fileno(),
84 testclass.pump_thread_wakeup_pipe[0]],
86 if testclass.vpp.stdout.fileno() in readable:
87 read = os.read(testclass.vpp.stdout.fileno(), 102400)
89 split = read.splitlines(True)
90 if len(stdout_fragment) > 0:
91 split[0] = "%s%s" % (stdout_fragment, split[0])
92 if len(split) > 0 and split[-1].endswith("\n"):
96 stdout_fragment = split[-1]
97 testclass.vpp_stdout_deque.extend(split[:limit])
98 if not testclass.cache_vpp_output:
99 for line in split[:limit]:
100 testclass.logger.debug(
101 "VPP STDOUT: %s" % line.rstrip("\n"))
102 if testclass.vpp.stderr.fileno() in readable:
103 read = os.read(testclass.vpp.stderr.fileno(), 102400)
105 split = read.splitlines(True)
106 if len(stderr_fragment) > 0:
107 split[0] = "%s%s" % (stderr_fragment, split[0])
108 if len(split) > 0 and split[-1].endswith("\n"):
112 stderr_fragment = split[-1]
113 testclass.vpp_stderr_deque.extend(split[:limit])
114 if not testclass.cache_vpp_output:
115 for line in split[:limit]:
116 testclass.logger.debug(
117 "VPP STDERR: %s" % line.rstrip("\n"))
118 # ignoring the dummy pipe here intentionally - the flag will take care
119 # of properly terminating the loop
122 def running_extended_tests():
124 s = os.getenv("EXTENDED_TESTS")
125 return True if s.lower() in ("y", "yes", "1") else False
131 def running_on_centos():
133 os_id = os.getenv("OS_ID")
134 return True if "centos" in os_id.lower() else False
140 class KeepAliveReporter(object):
142 Singleton object which reports test start to parent process
147 self.__dict__ = self._shared_state
154 def pipe(self, pipe):
155 if hasattr(self, '_pipe'):
156 raise Exception("Internal error - pipe should only be set once.")
159 def send_keep_alive(self, test):
161 Write current test tmpdir & desc to keep-alive pipe to signal liveness
163 if self.pipe is None:
164 # if not running forked..
170 desc = test.shortDescription()
174 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
177 class VppTestCase(unittest.TestCase):
178 """This subclass is a base class for VPP test cases that are implemented as
179 classes. It provides methods to create and run test case.
183 def packet_infos(self):
184 """List of packet infos"""
185 return self._packet_infos
188 def get_packet_count_for_if_idx(cls, dst_if_index):
189 """Get the number of packet info for specified destination if index"""
190 if dst_if_index in cls._packet_count_for_dst_if_idx:
191 return cls._packet_count_for_dst_if_idx[dst_if_index]
197 """Return the instance of this testcase"""
198 return cls.test_instance
201 def set_debug_flags(cls, d):
202 cls.debug_core = False
203 cls.debug_gdb = False
204 cls.debug_gdbserver = False
209 cls.debug_core = True
212 elif dl == "gdbserver":
213 cls.debug_gdbserver = True
215 raise Exception("Unrecognized DEBUG option: '%s'" % d)
218 def setUpConstants(cls):
219 """ Set-up the test case class based on environment variables """
221 s = os.getenv("STEP")
222 cls.step = True if s.lower() in ("y", "yes", "1") else False
226 d = os.getenv("DEBUG")
230 c = os.getenv("CACHE_OUTPUT", "1")
231 cls.cache_vpp_output = \
232 False if c.lower() in ("n", "no", "0") else True
234 cls.cache_vpp_output = True
235 cls.set_debug_flags(d)
236 cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
237 cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
238 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
240 if cls.plugin_path is not None:
241 if cls.extern_plugin_path is not None:
242 plugin_path = "%s:%s" % (
243 cls.plugin_path, cls.extern_plugin_path)
245 plugin_path = cls.plugin_path
246 elif cls.extern_plugin_path is not None:
247 plugin_path = cls.extern_plugin_path
249 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
250 debug_cli = "cli-listen localhost:5002"
253 size = os.getenv("COREDUMP_SIZE")
255 coredump_size = "coredump-size %s" % size
258 if coredump_size is None:
259 coredump_size = "coredump-size unlimited"
260 cls.vpp_cmdline = [cls.vpp_bin, "unix",
261 "{", "nodaemon", debug_cli, "full-coredump",
262 coredump_size, "}", "api-trace", "{", "on", "}",
263 "api-segment", "{", "prefix", cls.shm_prefix, "}",
264 "plugins", "{", "plugin", "dpdk_plugin.so", "{",
265 "disable", "}", "}", ]
266 if plugin_path is not None:
267 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
268 cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
271 def wait_for_enter(cls):
272 if cls.debug_gdbserver:
273 print(double_line_delim)
274 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
276 print(double_line_delim)
277 print("Spawned VPP with PID: %d" % cls.vpp.pid)
279 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
281 print(single_line_delim)
282 print("You can debug the VPP using e.g.:")
283 if cls.debug_gdbserver:
284 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
285 print("Now is the time to attach a gdb by running the above "
286 "command, set up breakpoints etc. and then resume VPP from "
287 "within gdb by issuing the 'continue' command")
289 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
290 print("Now is the time to attach a gdb by running the above "
291 "command and set up breakpoints etc.")
292 print(single_line_delim)
293 raw_input("Press ENTER to continue running the testcase...")
297 cmdline = cls.vpp_cmdline
299 if cls.debug_gdbserver:
300 gdbserver = '/usr/bin/gdbserver'
301 if not os.path.isfile(gdbserver) or \
302 not os.access(gdbserver, os.X_OK):
303 raise Exception("gdbserver binary '%s' does not exist or is "
304 "not executable" % gdbserver)
306 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
307 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
310 cls.vpp = subprocess.Popen(cmdline,
311 stdout=subprocess.PIPE,
312 stderr=subprocess.PIPE,
314 except Exception as e:
315 cls.logger.critical("Couldn't start vpp: %s" % e)
323 Perform class setup before running the testcase
324 Remove shared memory files, start vpp and connect the vpp-api
326 gc.collect() # run garbage collection first
328 cls.logger = getLogger(cls.__name__)
329 cls.tempdir = tempfile.mkdtemp(
330 prefix='vpp-unittest-%s-' % cls.__name__)
331 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
332 cls.file_handler.setFormatter(
333 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
335 cls.file_handler.setLevel(DEBUG)
336 cls.logger.addHandler(cls.file_handler)
337 cls.shm_prefix = cls.tempdir.split("/")[-1]
338 os.chdir(cls.tempdir)
339 cls.logger.info("Temporary dir is %s, shm prefix is %s",
340 cls.tempdir, cls.shm_prefix)
342 cls.reset_packet_infos()
344 cls._zombie_captures = []
347 cls.registry = VppObjectRegistry()
348 cls.vpp_startup_failed = False
349 cls.reporter = KeepAliveReporter()
350 # need to catch exceptions here because if we raise, then the cleanup
351 # doesn't get called and we might end with a zombie vpp
354 cls.reporter.send_keep_alive(cls)
355 cls.vpp_stdout_deque = deque()
356 cls.vpp_stderr_deque = deque()
357 cls.pump_thread_stop_flag = Event()
358 cls.pump_thread_wakeup_pipe = os.pipe()
359 cls.pump_thread = Thread(target=pump_output, args=(cls,))
360 cls.pump_thread.daemon = True
361 cls.pump_thread.start()
362 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls)
367 cls.vapi.register_hook(hook)
368 cls.sleep(0.1, "after vpp startup, before initial poll")
372 cls.vpp_startup_failed = True
374 "VPP died shortly after startup, check the"
375 " output to standard error for possible cause")
380 if cls.debug_gdbserver:
381 print(colorize("You're running VPP inside gdbserver but "
382 "VPP-API connection failed, did you forget "
383 "to 'continue' VPP from within gdb?", RED))
386 t, v, tb = sys.exc_info()
396 Disconnect vpp-api, kill vpp and cleanup shared memory files
398 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
400 if cls.vpp.returncode is None:
401 print(double_line_delim)
402 print("VPP or GDB server is still running")
403 print(single_line_delim)
404 raw_input("When done debugging, press ENTER to kill the "
405 "process and finish running the testcase...")
407 os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
408 cls.pump_thread_stop_flag.set()
409 if hasattr(cls, 'pump_thread'):
410 cls.logger.debug("Waiting for pump thread to stop")
411 cls.pump_thread.join()
412 if hasattr(cls, 'vpp_stderr_reader_thread'):
413 cls.logger.debug("Waiting for stdderr pump to stop")
414 cls.vpp_stderr_reader_thread.join()
416 if hasattr(cls, 'vpp'):
417 if hasattr(cls, 'vapi'):
418 cls.vapi.disconnect()
421 if cls.vpp.returncode is None:
422 cls.logger.debug("Sending TERM to vpp")
424 cls.logger.debug("Waiting for vpp to die")
425 cls.vpp.communicate()
428 if cls.vpp_startup_failed:
429 stdout_log = cls.logger.info
430 stderr_log = cls.logger.critical
432 stdout_log = cls.logger.info
433 stderr_log = cls.logger.info
435 if hasattr(cls, 'vpp_stdout_deque'):
436 stdout_log(single_line_delim)
437 stdout_log('VPP output to stdout while running %s:', cls.__name__)
438 stdout_log(single_line_delim)
439 vpp_output = "".join(cls.vpp_stdout_deque)
440 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
442 stdout_log('\n%s', vpp_output)
443 stdout_log(single_line_delim)
445 if hasattr(cls, 'vpp_stderr_deque'):
446 stderr_log(single_line_delim)
447 stderr_log('VPP output to stderr while running %s:', cls.__name__)
448 stderr_log(single_line_delim)
449 vpp_output = "".join(cls.vpp_stderr_deque)
450 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
452 stderr_log('\n%s', vpp_output)
453 stderr_log(single_line_delim)
456 def tearDownClass(cls):
457 """ Perform final cleanup after running all tests in this test-case """
459 cls.file_handler.close()
460 cls.reset_packet_infos()
462 debug_internal.on_tear_down_class(cls)
465 """ Show various debug prints after each test """
466 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
467 (self.__class__.__name__, self._testMethodName,
468 self._testMethodDoc))
469 if not self.vpp_dead:
470 self.logger.debug(self.vapi.cli("show trace"))
471 self.logger.info(self.vapi.ppcli("show interface"))
472 self.logger.info(self.vapi.ppcli("show hardware"))
473 self.logger.info(self.vapi.ppcli("show error"))
474 self.logger.info(self.vapi.ppcli("show run"))
475 self.registry.remove_vpp_config(self.logger)
476 # Save/Dump VPP api trace log
477 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
478 tmp_api_trace = "/tmp/%s" % api_trace
479 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
480 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
481 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
483 os.rename(tmp_api_trace, vpp_api_trace_log)
484 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
487 self.registry.unregister_all(self.logger)
490 """ Clear trace before running each test"""
491 self.reporter.send_keep_alive(self)
492 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
493 (self.__class__.__name__, self._testMethodName,
494 self._testMethodDoc))
496 raise Exception("VPP is dead when setting up the test")
497 self.sleep(.1, "during setUp")
498 self.vpp_stdout_deque.append(
499 "--- test setUp() for %s.%s(%s) starts here ---\n" %
500 (self.__class__.__name__, self._testMethodName,
501 self._testMethodDoc))
502 self.vpp_stderr_deque.append(
503 "--- test setUp() for %s.%s(%s) starts here ---\n" %
504 (self.__class__.__name__, self._testMethodName,
505 self._testMethodDoc))
506 self.vapi.cli("clear trace")
507 # store the test instance inside the test class - so that objects
508 # holding the class can access instance methods (like assertEqual)
509 type(self).test_instance = self
512 def pg_enable_capture(cls, interfaces=None):
514 Enable capture on packet-generator interfaces
516 :param interfaces: iterable interface indexes (if None,
517 use self.pg_interfaces)
520 if interfaces is None:
521 interfaces = cls.pg_interfaces
526 def register_capture(cls, cap_name):
527 """ Register a capture in the testclass """
528 # add to the list of captures with current timestamp
529 cls._captures.append((time.time(), cap_name))
530 # filter out from zombies
531 cls._zombie_captures = [(stamp, name)
532 for (stamp, name) in cls._zombie_captures
537 """ Remove any zombie captures and enable the packet generator """
538 # how long before capture is allowed to be deleted - otherwise vpp
539 # crashes - 100ms seems enough (this shouldn't be needed at all)
542 for stamp, cap_name in cls._zombie_captures:
543 wait = stamp + capture_ttl - now
545 cls.sleep(wait, "before deleting capture %s" % cap_name)
547 cls.logger.debug("Removing zombie capture %s" % cap_name)
548 cls.vapi.cli('packet-generator delete %s' % cap_name)
550 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
551 cls.vapi.cli('packet-generator enable')
552 cls._zombie_captures = cls._captures
556 def create_pg_interfaces(cls, interfaces):
558 Create packet-generator interfaces.
560 :param interfaces: iterable indexes of the interfaces.
561 :returns: List of created interfaces.
566 intf = VppPGInterface(cls, i)
567 setattr(cls, intf.name, intf)
569 cls.pg_interfaces = result
573 def create_loopback_interfaces(cls, interfaces):
575 Create loopback interfaces.
577 :param interfaces: iterable indexes of the interfaces.
578 :returns: List of created interfaces.
582 intf = VppLoInterface(cls, i)
583 setattr(cls, intf.name, intf)
585 cls.lo_interfaces = result
589 def extend_packet(packet, size, padding=' '):
591 Extend packet to given size by padding with spaces or custom padding
592 NOTE: Currently works only when Raw layer is present.
594 :param packet: packet
595 :param size: target size
596 :param padding: padding used to extend the payload
599 packet_len = len(packet) + 4
600 extend = size - packet_len
602 num = (extend / len(padding)) + 1
603 packet[Raw].load += (padding * num)[:extend]
606 def reset_packet_infos(cls):
607 """ Reset the list of packet info objects and packet counts to zero """
608 cls._packet_infos = {}
609 cls._packet_count_for_dst_if_idx = {}
612 def create_packet_info(cls, src_if, dst_if):
614 Create packet info object containing the source and destination indexes
615 and add it to the testcase's packet info list
617 :param VppInterface src_if: source interface
618 :param VppInterface dst_if: destination interface
620 :returns: _PacketInfo object
624 info.index = len(cls._packet_infos)
625 info.src = src_if.sw_if_index
626 info.dst = dst_if.sw_if_index
627 if isinstance(dst_if, VppSubInterface):
628 dst_idx = dst_if.parent.sw_if_index
630 dst_idx = dst_if.sw_if_index
631 if dst_idx in cls._packet_count_for_dst_if_idx:
632 cls._packet_count_for_dst_if_idx[dst_idx] += 1
634 cls._packet_count_for_dst_if_idx[dst_idx] = 1
635 cls._packet_infos[info.index] = info
639 def info_to_payload(info):
641 Convert _PacketInfo object to packet payload
643 :param info: _PacketInfo object
645 :returns: string containing serialized data from packet info
647 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
651 def payload_to_info(payload):
653 Convert packet payload to _PacketInfo object
655 :param payload: packet payload
657 :returns: _PacketInfo object containing de-serialized data from payload
660 numbers = payload.split()
662 info.index = int(numbers[0])
663 info.src = int(numbers[1])
664 info.dst = int(numbers[2])
665 info.ip = int(numbers[3])
666 info.proto = int(numbers[4])
669 def get_next_packet_info(self, info):
671 Iterate over the packet info list stored in the testcase
672 Start iteration with first element if info is None
673 Continue based on index in info if info is specified
675 :param info: info or None
676 :returns: next info in list or None if no more infos
681 next_index = info.index + 1
682 if next_index == len(self._packet_infos):
685 return self._packet_infos[next_index]
687 def get_next_packet_info_for_interface(self, src_index, info):
689 Search the packet info list for the next packet info with same source
692 :param src_index: source interface index to search for
693 :param info: packet info - where to start the search
694 :returns: packet info or None
698 info = self.get_next_packet_info(info)
701 if info.src == src_index:
704 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
706 Search the packet info list for the next packet info with same source
707 and destination interface indexes
709 :param src_index: source interface index to search for
710 :param dst_index: destination interface index to search for
711 :param info: packet info - where to start the search
712 :returns: packet info or None
716 info = self.get_next_packet_info_for_interface(src_index, info)
719 if info.dst == dst_index:
722 def assert_equal(self, real_value, expected_value, name_or_class=None):
723 if name_or_class is None:
724 self.assertEqual(real_value, expected_value)
727 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
728 msg = msg % (getdoc(name_or_class).strip(),
729 real_value, str(name_or_class(real_value)),
730 expected_value, str(name_or_class(expected_value)))
732 msg = "Invalid %s: %s does not match expected value %s" % (
733 name_or_class, real_value, expected_value)
735 self.assertEqual(real_value, expected_value, msg)
737 def assert_in_range(self,
745 msg = "Invalid %s: %s out of range <%s,%s>" % (
746 name, real_value, expected_min, expected_max)
747 self.assertTrue(expected_min <= real_value <= expected_max, msg)
750 def sleep(cls, timeout, remark=None):
751 if hasattr(cls, 'logger'):
752 cls.logger.debug("Starting sleep for %ss (%s)" % (timeout, remark))
756 if after - before > 2 * timeout:
757 cls.logger.error("unexpected time.sleep() result - "
758 "slept for %ss instead of ~%ss!" % (
759 after - before, timeout))
760 if hasattr(cls, 'logger'):
762 "Finished sleep (%s) - slept %ss (wanted %ss)" % (
763 remark, after - before, timeout))
765 def send_and_assert_no_replies(self, intf, pkts, remark=""):
766 self.vapi.cli("clear trace")
767 intf.add_stream(pkts)
768 self.pg_enable_capture(self.pg_interfaces)
771 for i in self.pg_interfaces:
772 i.get_capture(0, timeout=timeout)
773 i.assert_nothing_captured(remark=remark)
776 def send_and_expect(self, input, pkts, output):
777 self.vapi.cli("clear trace")
778 input.add_stream(pkts)
779 self.pg_enable_capture(self.pg_interfaces)
781 rx = output.get_capture(len(pkts))
785 class TestCasePrinter(object):
789 self.__dict__ = self._shared_state
790 if not hasattr(self, "_test_case_set"):
791 self._test_case_set = set()
793 def print_test_case_heading_if_first_time(self, case):
794 if case.__class__ not in self._test_case_set:
795 print(double_line_delim)
796 print(colorize(getdoc(case.__class__).splitlines()[0], YELLOW))
797 print(double_line_delim)
798 self._test_case_set.add(case.__class__)
801 class VppTestResult(unittest.TestResult):
803 @property result_string
804 String variable to store the test case result string.
806 List variable containing 2-tuples of TestCase instances and strings
807 holding formatted tracebacks. Each tuple represents a test which
808 raised an unexpected exception.
810 List variable containing 2-tuples of TestCase instances and strings
811 holding formatted tracebacks. Each tuple represents a test where
812 a failure was explicitly signalled using the TestCase.assert*()
816 def __init__(self, stream, descriptions, verbosity):
818 :param stream File descriptor to store where to report test results.
819 Set to the standard error stream by default.
820 :param descriptions Boolean variable to store information if to use
821 test case descriptions.
822 :param verbosity Integer variable to store required verbosity level.
824 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
826 self.descriptions = descriptions
827 self.verbosity = verbosity
828 self.result_string = None
829 self.printer = TestCasePrinter()
831 def addSuccess(self, test):
833 Record a test succeeded result
838 if hasattr(test, 'logger'):
839 test.logger.debug("--- addSuccess() %s.%s(%s) called"
840 % (test.__class__.__name__,
841 test._testMethodName,
842 test._testMethodDoc))
843 unittest.TestResult.addSuccess(self, test)
844 self.result_string = colorize("OK", GREEN)
846 def addSkip(self, test, reason):
848 Record a test skipped.
854 if hasattr(test, 'logger'):
855 test.logger.debug("--- addSkip() %s.%s(%s) called, reason is %s"
856 % (test.__class__.__name__,
857 test._testMethodName,
860 unittest.TestResult.addSkip(self, test, reason)
861 self.result_string = colorize("SKIP", YELLOW)
863 def symlink_failed(self, test):
865 if hasattr(test, 'logger'):
867 if hasattr(test, 'tempdir'):
869 failed_dir = os.getenv('VPP_TEST_FAILED_DIR')
870 link_path = '%s/%s-FAILED' % (failed_dir,
871 test.tempdir.split("/")[-1])
873 logger.debug("creating a link to the failed test")
874 logger.debug("os.symlink(%s, %s)" %
875 (test.tempdir, link_path))
876 os.symlink(test.tempdir, link_path)
877 except Exception as e:
881 def send_failure_through_pipe(self, test):
882 if hasattr(self, 'test_framework_failed_pipe'):
883 pipe = self.test_framework_failed_pipe
885 pipe.send(test.__class__)
887 def addFailure(self, test, err):
889 Record a test failed result
892 :param err: error message
895 if hasattr(test, 'logger'):
896 test.logger.debug("--- addFailure() %s.%s(%s) called, err is %s"
897 % (test.__class__.__name__,
898 test._testMethodName,
899 test._testMethodDoc, err))
900 test.logger.debug("formatted exception is:\n%s" %
901 "".join(format_exception(*err)))
902 unittest.TestResult.addFailure(self, test, err)
903 if hasattr(test, 'tempdir'):
904 self.result_string = colorize("FAIL", RED) + \
905 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
906 self.symlink_failed(test)
908 self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
910 self.send_failure_through_pipe(test)
912 def addError(self, test, err):
914 Record a test error result
917 :param err: error message
920 if hasattr(test, 'logger'):
921 test.logger.debug("--- addError() %s.%s(%s) called, err is %s"
922 % (test.__class__.__name__,
923 test._testMethodName,
924 test._testMethodDoc, err))
925 test.logger.debug("formatted exception is:\n%s" %
926 "".join(format_exception(*err)))
927 unittest.TestResult.addError(self, test, err)
928 if hasattr(test, 'tempdir'):
929 self.result_string = colorize("ERROR", RED) + \
930 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
931 self.symlink_failed(test)
933 self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
935 self.send_failure_through_pipe(test)
937 def getDescription(self, test):
942 :returns: test description
945 # TODO: if none print warning not raise exception
946 short_description = test.shortDescription()
947 if self.descriptions and short_description:
948 return short_description
952 def startTest(self, test):
959 self.printer.print_test_case_heading_if_first_time(test)
960 unittest.TestResult.startTest(self, test)
961 if self.verbosity > 0:
963 "Starting " + self.getDescription(test) + " ...")
964 self.stream.writeln(single_line_delim)
966 def stopTest(self, test):
973 unittest.TestResult.stopTest(self, test)
974 if self.verbosity > 0:
975 self.stream.writeln(single_line_delim)
976 self.stream.writeln("%-73s%s" % (self.getDescription(test),
978 self.stream.writeln(single_line_delim)
980 self.stream.writeln("%-73s%s" % (self.getDescription(test),
983 def printErrors(self):
985 Print errors from running the test case
987 self.stream.writeln()
988 self.printErrorList('ERROR', self.errors)
989 self.printErrorList('FAIL', self.failures)
991 def printErrorList(self, flavour, errors):
993 Print error list to the output stream together with error type
994 and test case description.
996 :param flavour: error type
997 :param errors: iterable errors
1000 for test, err in errors:
1001 self.stream.writeln(double_line_delim)
1002 self.stream.writeln("%s: %s" %
1003 (flavour, self.getDescription(test)))
1004 self.stream.writeln(single_line_delim)
1005 self.stream.writeln("%s" % err)
1008 class Filter_by_test_option:
1009 def __init__(self, filter_file_name, filter_class_name, filter_func_name):
1010 self.filter_file_name = filter_file_name
1011 self.filter_class_name = filter_class_name
1012 self.filter_func_name = filter_func_name
1014 def __call__(self, file_name, class_name, func_name):
1015 if self.filter_file_name and file_name != self.filter_file_name:
1017 if self.filter_class_name and class_name != self.filter_class_name:
1019 if self.filter_func_name and func_name != self.filter_func_name:
1024 class VppTestRunner(unittest.TextTestRunner):
1026 A basic test runner implementation which prints results to standard error.
1029 def resultclass(self):
1030 """Class maintaining the results of the tests"""
1031 return VppTestResult
1033 def __init__(self, keep_alive_pipe=None, failed_pipe=None,
1034 stream=sys.stderr, descriptions=True,
1035 verbosity=1, failfast=False, buffer=False, resultclass=None):
1036 # ignore stream setting here, use hard-coded stdout to be in sync
1037 # with prints from VppTestCase methods ...
1038 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1039 verbosity, failfast, buffer,
1041 reporter = KeepAliveReporter()
1042 reporter.pipe = keep_alive_pipe
1043 # this is super-ugly, but very simple to implement and works as long
1044 # as we run only one test at the same time
1045 VppTestResult.test_framework_failed_pipe = failed_pipe
1047 test_option = "TEST"
1049 def parse_test_option(self):
1051 f = os.getenv(self.test_option)
1054 filter_file_name = None
1055 filter_class_name = None
1056 filter_func_name = None
1059 parts = f.split('.')
1061 raise Exception("Unrecognized %s option: %s" %
1062 (self.test_option, f))
1064 if parts[2] not in ('*', ''):
1065 filter_func_name = parts[2]
1066 if parts[1] not in ('*', ''):
1067 filter_class_name = parts[1]
1068 if parts[0] not in ('*', ''):
1069 if parts[0].startswith('test_'):
1070 filter_file_name = parts[0]
1072 filter_file_name = 'test_%s' % parts[0]
1074 if f.startswith('test_'):
1075 filter_file_name = f
1077 filter_file_name = 'test_%s' % f
1078 return filter_file_name, filter_class_name, filter_func_name
1081 def filter_tests(tests, filter_cb):
1082 result = unittest.suite.TestSuite()
1084 if isinstance(t, unittest.suite.TestSuite):
1085 # this is a bunch of tests, recursively filter...
1086 x = VppTestRunner.filter_tests(t, filter_cb)
1087 if x.countTestCases() > 0:
1089 elif isinstance(t, unittest.TestCase):
1090 # this is a single test
1091 parts = t.id().split('.')
1092 # t.id() for common cases like this:
1093 # test_classifier.TestClassifier.test_acl_ip
1094 # apply filtering only if it is so
1096 if not filter_cb(parts[0], parts[1], parts[2]):
1100 # unexpected object, don't touch it
1104 def run(self, test):
1111 faulthandler.enable() # emit stack trace to stderr if killed by signal
1112 print("Running tests using custom test runner") # debug message
1113 filter_file, filter_class, filter_func = self.parse_test_option()
1114 print("Active filters: file=%s, class=%s, function=%s" % (
1115 filter_file, filter_class, filter_func))
1116 filter_cb = Filter_by_test_option(
1117 filter_file, filter_class, filter_func)
1118 filtered = self.filter_tests(test, filter_cb)
1119 print("%s out of %s tests match specified filters" % (
1120 filtered.countTestCases(), test.countTestCases()))
1121 if not running_extended_tests():
1122 print("Not running extended tests (some tests will be skipped)")
1123 return super(VppTestRunner, self).run(filtered)
1126 class Worker(Thread):
1127 def __init__(self, args, logger, env={}):
1128 self.logger = logger
1131 self.env = copy.deepcopy(env)
1132 super(Worker, self).__init__()
1135 executable = self.args[0]
1136 self.logger.debug("Running executable w/args `%s'" % self.args)
1137 env = os.environ.copy()
1138 env.update(self.env)
1139 env["CK_LOG_FILE_NAME"] = "-"
1140 self.process = subprocess.Popen(
1141 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1142 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1143 out, err = self.process.communicate()
1144 self.logger.debug("Finished running `%s'" % executable)
1145 self.logger.info("Return code is `%s'" % self.process.returncode)
1146 self.logger.info(single_line_delim)
1147 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1148 self.logger.info(single_line_delim)
1149 self.logger.info(out)
1150 self.logger.info(single_line_delim)
1151 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1152 self.logger.info(single_line_delim)
1153 self.logger.info(err)
1154 self.logger.info(single_line_delim)
1155 self.result = self.process.returncode