3 from __future__ import print_function
19 from collections import deque
20 from threading import Thread, Event
21 from inspect import getdoc, isclass
22 from traceback import format_exception
23 from logging import FileHandler, DEBUG, Formatter
25 from abc import ABC, abstractmethod
26 from struct import pack, unpack
29 from scapy.packet import Raw
30 import hook as hookmodule
31 from vpp_pg_interface import VppPGInterface
32 from vpp_sub_interface import VppSubInterface
33 from vpp_lo_interface import VppLoInterface
34 from vpp_bvi_interface import VppBviInterface
35 from vpp_papi_provider import VppPapiProvider
36 from vpp_papi import VppEnum
38 from vpp_papi.vpp_stats import VPPStats
39 from vpp_papi.vpp_transport_socket import VppTransportSocketIOError
40 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
42 from vpp_object import VppObjectRegistry
43 from util import ppp, is_core_present
44 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
45 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
46 from scapy.layers.inet6 import ICMPv6EchoReply
48 from cpu_config import available_cpus, num_cpus, max_vpp_cpus
50 logger = logging.getLogger(__name__)
52 # Set up an empty logger for the testcase that can be overridden as necessary
53 null_logger = logging.getLogger('VppTestCase')
54 null_logger.addHandler(logging.NullHandler())
64 class BoolEnvironmentVariable(object):
66 def __init__(self, env_var_name, default='n', true_values=None):
67 self.name = env_var_name
68 self.default = default
69 self.true_values = true_values if true_values is not None else \
73 return os.getenv(self.name, self.default).lower() in self.true_values
75 if sys.version_info[0] == 2:
76 __nonzero__ = __bool__
79 return 'BoolEnvironmentVariable(%r, default=%r, true_values=%r)' % \
80 (self.name, self.default, self.true_values)
83 debug_framework = BoolEnvironmentVariable('TEST_DEBUG')
88 Test framework module.
90 The module provides a set of tools for constructing and running tests and
91 representing the results.
95 class VppDiedError(Exception):
96 """ exception for reporting that the subprocess has died."""
98 signals_by_value = {v: k for k, v in signal.__dict__.items() if
99 k.startswith('SIG') and not k.startswith('SIG_')}
101 def __init__(self, rv=None, testcase=None, method_name=None):
103 self.signal_name = None
104 self.testcase = testcase
105 self.method_name = method_name
108 self.signal_name = VppDiedError.signals_by_value[-rv]
109 except (KeyError, TypeError):
112 if testcase is None and method_name is None:
115 in_msg = ' while running %s.%s' % (testcase, method_name)
118 msg = "VPP subprocess died unexpectedly%s with return code: %d%s."\
119 % (in_msg, self.rv, ' [%s]' %
121 self.signal_name is not None else ''))
123 msg = "VPP subprocess died unexpectedly%s." % in_msg
125 super(VppDiedError, self).__init__(msg)
128 class _PacketInfo(object):
129 """Private class to create packet info object.
131 Help process information about the next packet.
132 Set variables to default values.
134 #: Store the index of the packet.
136 #: Store the index of the source packet generator interface of the packet.
138 #: Store the index of the destination packet generator interface
141 #: Store expected ip version
143 #: Store expected upper protocol
145 #: Store the copy of the former packet.
148 def __eq__(self, other):
149 index = self.index == other.index
150 src = self.src == other.src
151 dst = self.dst == other.dst
152 data = self.data == other.data
153 return index and src and dst and data
156 def pump_output(testclass):
157 """ pump output from vpp stdout/stderr to proper queues """
160 while not testclass.pump_thread_stop_flag.is_set():
161 readable = select.select([testclass.vpp.stdout.fileno(),
162 testclass.vpp.stderr.fileno(),
163 testclass.pump_thread_wakeup_pipe[0]],
165 if testclass.vpp.stdout.fileno() in readable:
166 read = os.read(testclass.vpp.stdout.fileno(), 102400)
168 split = read.decode('ascii',
169 errors='backslashreplace').splitlines(True)
170 if len(stdout_fragment) > 0:
171 split[0] = "%s%s" % (stdout_fragment, split[0])
172 if len(split) > 0 and split[-1].endswith("\n"):
176 stdout_fragment = split[-1]
177 testclass.vpp_stdout_deque.extend(split[:limit])
178 if not testclass.cache_vpp_output:
179 for line in split[:limit]:
180 testclass.logger.info(
181 "VPP STDOUT: %s" % line.rstrip("\n"))
182 if testclass.vpp.stderr.fileno() in readable:
183 read = os.read(testclass.vpp.stderr.fileno(), 102400)
185 split = read.decode('ascii',
186 errors='backslashreplace').splitlines(True)
187 if len(stderr_fragment) > 0:
188 split[0] = "%s%s" % (stderr_fragment, split[0])
189 if len(split) > 0 and split[-1].endswith("\n"):
193 stderr_fragment = split[-1]
195 testclass.vpp_stderr_deque.extend(split[:limit])
196 if not testclass.cache_vpp_output:
197 for line in split[:limit]:
198 testclass.logger.error(
199 "VPP STDERR: %s" % line.rstrip("\n"))
200 # ignoring the dummy pipe here intentionally - the
201 # flag will take care of properly terminating the loop
204 def _is_skip_aarch64_set():
205 return BoolEnvironmentVariable('SKIP_AARCH64')
208 is_skip_aarch64_set = _is_skip_aarch64_set()
211 def _is_platform_aarch64():
212 return platform.machine() == 'aarch64'
215 is_platform_aarch64 = _is_platform_aarch64()
218 def _running_extended_tests():
219 return BoolEnvironmentVariable("EXTENDED_TESTS")
222 running_extended_tests = _running_extended_tests()
225 def _running_gcov_tests():
226 return BoolEnvironmentVariable("GCOV_TESTS")
229 running_gcov_tests = _running_gcov_tests()
232 def get_environ_vpp_worker_count():
233 worker_config = os.getenv("VPP_WORKER_CONFIG", None)
235 elems = worker_config.split(" ")
236 if elems[0] != "workers" or len(elems) != 2:
237 raise ValueError("Wrong VPP_WORKER_CONFIG == '%s' value." %
244 environ_vpp_worker_count = get_environ_vpp_worker_count()
247 class KeepAliveReporter(object):
249 Singleton object which reports test start to parent process
254 self.__dict__ = self._shared_state
262 def pipe(self, pipe):
263 if self._pipe is not None:
264 raise Exception("Internal error - pipe should only be set once.")
267 def send_keep_alive(self, test, desc=None):
269 Write current test tmpdir & desc to keep-alive pipe to signal liveness
271 if self.pipe is None:
272 # if not running forked..
276 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
280 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
283 class TestCaseTag(Enum):
284 # marks the suites that must run at the end
285 # using only a single test runner
287 # marks the suites broken on VPP multi-worker
288 FIXME_VPP_WORKERS = 2
291 def create_tag_decorator(e):
294 cls.test_tags.append(e)
295 except AttributeError:
301 tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
302 tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
316 class CPUInterface(ABC):
318 skipped_due_to_cpu_lack = False
322 def get_cpus_required(cls):
326 def assign_cpus(cls, cpus):
330 class VppTestCase(CPUInterface, unittest.TestCase):
331 """This subclass is a base class for VPP test cases that are implemented as
332 classes. It provides methods to create and run test case.
335 extra_vpp_statseg_config = ""
336 extra_vpp_punt_config = []
337 extra_vpp_plugin_config = []
339 vapi_response_timeout = 5
342 def packet_infos(self):
343 """List of packet infos"""
344 return self._packet_infos
347 def get_packet_count_for_if_idx(cls, dst_if_index):
348 """Get the number of packet info for specified destination if index"""
349 if dst_if_index in cls._packet_count_for_dst_if_idx:
350 return cls._packet_count_for_dst_if_idx[dst_if_index]
355 def has_tag(cls, tag):
356 """ if the test case has a given tag - return true """
358 return tag in cls.test_tags
359 except AttributeError:
364 def is_tagged_run_solo(cls):
365 """ if the test case class is timing-sensitive - return true """
366 return cls.has_tag(TestCaseTag.RUN_SOLO)
370 """Return the instance of this testcase"""
371 return cls.test_instance
374 def set_debug_flags(cls, d):
375 cls.gdbserver_port = 7777
376 cls.debug_core = False
377 cls.debug_gdb = False
378 cls.debug_gdbserver = False
379 cls.debug_all = False
380 cls.debug_attach = False
385 cls.debug_core = True
386 elif dl == "gdb" or dl == "gdb-all":
388 elif dl == "gdbserver" or dl == "gdbserver-all":
389 cls.debug_gdbserver = True
391 cls.debug_attach = True
393 raise Exception("Unrecognized DEBUG option: '%s'" % d)
394 if dl == "gdb-all" or dl == "gdbserver-all":
398 def get_vpp_worker_count(cls):
399 if not hasattr(cls, "vpp_worker_count"):
400 if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
401 cls.vpp_worker_count = 0
403 cls.vpp_worker_count = environ_vpp_worker_count
404 return cls.vpp_worker_count
407 def get_cpus_required(cls):
408 return 1 + cls.get_vpp_worker_count()
411 def setUpConstants(cls):
412 """ Set-up the test case class based on environment variables """
413 cls.step = BoolEnvironmentVariable('STEP')
414 # inverted case to handle '' == True
415 c = os.getenv("CACHE_OUTPUT", "1")
416 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
417 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
418 extern_plugin_path = os.getenv('EXTERN_PLUGINS')
420 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
421 debug_cli = "cli-listen localhost:5002"
423 size = os.getenv("COREDUMP_SIZE")
425 coredump_size = "coredump-size %s" % size
426 if coredump_size is None:
427 coredump_size = "coredump-size unlimited"
429 default_variant = os.getenv("VARIANT")
430 if default_variant is not None:
431 default_variant = "defaults { %s 100 }" % default_variant
435 api_fuzzing = os.getenv("API_FUZZ")
436 if api_fuzzing is None:
441 "unix", "{", "nodaemon", debug_cli, "full-coredump",
442 coredump_size, "runtime-dir", cls.tempdir, "}",
443 "api-trace", "{", "on", "}",
444 "api-segment", "{", "prefix", cls.get_api_segment_prefix(), "}",
445 "cpu", "{", "main-core", str(cls.cpus[0]), ]
446 if extern_plugin_path is not None:
447 cls.extra_vpp_plugin_config.append(
448 "add-path %s" % extern_plugin_path)
449 if cls.get_vpp_worker_count():
450 cls.vpp_cmdline.extend([
451 "corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])])
452 cls.vpp_cmdline.extend([
454 "physmem", "{", "max-size", "32m", "}",
455 "statseg", "{", "socket-name", cls.get_stats_sock_path(),
456 cls.extra_vpp_statseg_config, "}",
457 "socksvr", "{", "socket-name", cls.get_api_sock_path(), "}",
458 "node { ", default_variant, "}",
459 "api-fuzz {", api_fuzzing, "}",
460 "plugins", "{", "plugin", "dpdk_plugin.so", "{", "disable", "}",
461 "plugin", "rdma_plugin.so", "{", "disable", "}",
462 "plugin", "lisp_unittest_plugin.so", "{", "enable", "}",
463 "plugin", "unittest_plugin.so", "{", "enable", "}"
464 ] + cls.extra_vpp_plugin_config + ["}", ])
466 if cls.extra_vpp_punt_config is not None:
467 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
469 if not cls.debug_attach:
470 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
471 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
474 def wait_for_enter(cls):
475 if cls.debug_gdbserver:
476 print(double_line_delim)
477 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
479 print(double_line_delim)
480 print("Spawned VPP with PID: %d" % cls.vpp.pid)
482 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
484 print(single_line_delim)
485 print("You can debug VPP using:")
486 if cls.debug_gdbserver:
487 print("sudo gdb " + cls.vpp_bin +
488 " -ex 'target remote localhost:{port}'"
489 .format(port=cls.gdbserver_port))
490 print("Now is the time to attach gdb by running the above "
491 "command, set up breakpoints etc., then resume VPP from "
492 "within gdb by issuing the 'continue' command")
493 cls.gdbserver_port += 1
495 print("sudo gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
496 print("Now is the time to attach gdb by running the above "
497 "command and set up breakpoints etc., then resume VPP from"
498 " within gdb by issuing the 'continue' command")
499 print(single_line_delim)
500 input("Press ENTER to continue running the testcase...")
508 cls.logger.debug(f"Assigned cpus: {cls.cpus}")
509 cmdline = cls.vpp_cmdline
511 if cls.debug_gdbserver:
512 gdbserver = '/usr/bin/gdbserver'
513 if not os.path.isfile(gdbserver) or\
514 not os.access(gdbserver, os.X_OK):
515 raise Exception("gdbserver binary '%s' does not exist or is "
516 "not executable" % gdbserver)
518 cmdline = [gdbserver, 'localhost:{port}'
519 .format(port=cls.gdbserver_port)] + cls.vpp_cmdline
520 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
523 cls.vpp = subprocess.Popen(cmdline,
524 stdout=subprocess.PIPE,
525 stderr=subprocess.PIPE)
526 except subprocess.CalledProcessError as e:
527 cls.logger.critical("Subprocess returned with non-0 return code: ("
531 cls.logger.critical("Subprocess returned with OS error: "
532 "(%s) %s", e.errno, e.strerror)
534 except Exception as e:
535 cls.logger.exception("Subprocess returned unexpected from "
542 def wait_for_coredump(cls):
543 corefile = cls.tempdir + "/core"
544 if os.path.isfile(corefile):
545 cls.logger.error("Waiting for coredump to complete: %s", corefile)
546 curr_size = os.path.getsize(corefile)
547 deadline = time.time() + 60
549 while time.time() < deadline:
552 curr_size = os.path.getsize(corefile)
553 if size == curr_size:
557 cls.logger.error("Timed out waiting for coredump to complete:"
560 cls.logger.error("Coredump complete: %s, size %d",
564 def get_stats_sock_path(cls):
565 return "%s/stats.sock" % cls.tempdir
568 def get_api_sock_path(cls):
569 return "%s/api.sock" % cls.tempdir
572 def get_api_segment_prefix(cls):
573 return os.path.basename(cls.tempdir) # Only used for VAPI
576 def get_tempdir(cls):
578 return os.getenv("VPP_IN_GDB_TMP_DIR",
579 "/tmp/unittest-attach-gdb")
581 return tempfile.mkdtemp(prefix='vpp-unittest-%s-' % cls.__name__)
586 Perform class setup before running the testcase
587 Remove shared memory files, start vpp and connect the vpp-api
589 super(VppTestCase, cls).setUpClass()
590 cls.logger = get_logger(cls.__name__)
591 seed = os.environ["RND_SEED"]
593 if hasattr(cls, 'parallel_handler'):
594 cls.logger.addHandler(cls.parallel_handler)
595 cls.logger.propagate = False
596 d = os.getenv("DEBUG", None)
597 cls.set_debug_flags(d)
598 cls.tempdir = cls.get_tempdir()
599 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
600 cls.file_handler.setFormatter(
601 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
603 cls.file_handler.setLevel(DEBUG)
604 cls.logger.addHandler(cls.file_handler)
605 cls.logger.debug("--- setUpClass() for %s called ---" % cls.__name__)
606 os.chdir(cls.tempdir)
607 cls.logger.info("Temporary dir is %s, api socket is %s",
608 cls.tempdir, cls.get_api_sock_path())
609 cls.logger.debug("Random seed is %s", seed)
611 cls.reset_packet_infos()
616 cls.registry = VppObjectRegistry()
617 cls.vpp_startup_failed = False
618 cls.reporter = KeepAliveReporter()
619 # need to catch exceptions here because if we raise, then the cleanup
620 # doesn't get called and we might end with a zombie vpp
626 cls.reporter.send_keep_alive(cls, 'setUpClass')
627 VppTestResult.current_test_case_info = TestCaseInfo(
628 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
629 cls.vpp_stdout_deque = deque()
630 cls.vpp_stderr_deque = deque()
631 if not cls.debug_attach:
632 cls.pump_thread_stop_flag = Event()
633 cls.pump_thread_wakeup_pipe = os.pipe()
634 cls.pump_thread = Thread(target=pump_output, args=(cls,))
635 cls.pump_thread.daemon = True
636 cls.pump_thread.start()
637 if cls.debug_gdb or cls.debug_gdbserver or cls.debug_attach:
638 cls.vapi_response_timeout = 0
639 cls.vapi = VppPapiProvider(cls.__name__, cls,
640 cls.vapi_response_timeout)
642 hook = hookmodule.StepHook(cls)
644 hook = hookmodule.PollHook(cls)
645 cls.vapi.register_hook(hook)
646 cls.statistics = VPPStats(socketname=cls.get_stats_sock_path())
650 cls.vpp_startup_failed = True
652 "VPP died shortly after startup, check the"
653 " output to standard error for possible cause")
657 except (vpp_papi.VPPIOError, Exception) as e:
658 cls.logger.debug("Exception connecting to vapi: %s" % e)
659 cls.vapi.disconnect()
661 if cls.debug_gdbserver:
662 print(colorize("You're running VPP inside gdbserver but "
663 "VPP-API connection failed, did you forget "
664 "to 'continue' VPP from within gdb?", RED))
667 last_line = cls.vapi.cli("show thread").split("\n")[-2]
668 cls.vpp_worker_count = int(last_line.split(" ")[0])
669 print("Detected VPP with %s workers." % cls.vpp_worker_count)
670 except vpp_papi.VPPRuntimeError as e:
671 cls.logger.debug("%s" % e)
674 except Exception as e:
675 cls.logger.debug("Exception connecting to VPP: %s" % e)
680 def _debug_quit(cls):
681 if (cls.debug_gdbserver or cls.debug_gdb):
685 if cls.vpp.returncode is None:
687 print(double_line_delim)
688 print("VPP or GDB server is still running")
689 print(single_line_delim)
690 input("When done debugging, press ENTER to kill the "
691 "process and finish running the testcase...")
692 except AttributeError:
698 Disconnect vpp-api, kill vpp and cleanup shared memory files
702 # first signal that we want to stop the pump thread, then wake it up
703 if hasattr(cls, 'pump_thread_stop_flag'):
704 cls.pump_thread_stop_flag.set()
705 if hasattr(cls, 'pump_thread_wakeup_pipe'):
706 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
707 if hasattr(cls, 'pump_thread'):
708 cls.logger.debug("Waiting for pump thread to stop")
709 cls.pump_thread.join()
710 if hasattr(cls, 'vpp_stderr_reader_thread'):
711 cls.logger.debug("Waiting for stderr pump to stop")
712 cls.vpp_stderr_reader_thread.join()
714 if hasattr(cls, 'vpp'):
715 if hasattr(cls, 'vapi'):
716 cls.logger.debug(cls.vapi.vpp.get_stats())
717 cls.logger.debug("Disconnecting class vapi client on %s",
719 cls.vapi.disconnect()
720 cls.logger.debug("Deleting class vapi attribute on %s",
724 if not cls.debug_attach and cls.vpp.returncode is None:
725 cls.wait_for_coredump()
726 cls.logger.debug("Sending TERM to vpp")
728 cls.logger.debug("Waiting for vpp to die")
730 outs, errs = cls.vpp.communicate(timeout=5)
731 except subprocess.TimeoutExpired:
733 outs, errs = cls.vpp.communicate()
734 cls.logger.debug("Deleting class vpp attribute on %s",
736 if not cls.debug_attach:
737 cls.vpp.stdout.close()
738 cls.vpp.stderr.close()
741 if cls.vpp_startup_failed:
742 stdout_log = cls.logger.info
743 stderr_log = cls.logger.critical
745 stdout_log = cls.logger.info
746 stderr_log = cls.logger.info
748 if hasattr(cls, 'vpp_stdout_deque'):
749 stdout_log(single_line_delim)
750 stdout_log('VPP output to stdout while running %s:', cls.__name__)
751 stdout_log(single_line_delim)
752 vpp_output = "".join(cls.vpp_stdout_deque)
753 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
755 stdout_log('\n%s', vpp_output)
756 stdout_log(single_line_delim)
758 if hasattr(cls, 'vpp_stderr_deque'):
759 stderr_log(single_line_delim)
760 stderr_log('VPP output to stderr while running %s:', cls.__name__)
761 stderr_log(single_line_delim)
762 vpp_output = "".join(cls.vpp_stderr_deque)
763 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
765 stderr_log('\n%s', vpp_output)
766 stderr_log(single_line_delim)
769 def tearDownClass(cls):
770 """ Perform final cleanup after running all tests in this test-case """
771 cls.logger.debug("--- tearDownClass() for %s called ---" %
773 cls.reporter.send_keep_alive(cls, 'tearDownClass')
775 cls.file_handler.close()
776 cls.reset_packet_infos()
778 debug_internal.on_tear_down_class(cls)
780 def show_commands_at_teardown(self):
781 """ Allow subclass specific teardown logging additions."""
782 self.logger.info("--- No test specific show commands provided. ---")
785 """ Show various debug prints after each test """
786 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
787 (self.__class__.__name__, self._testMethodName,
788 self._testMethodDoc))
791 if not self.vpp_dead:
792 self.logger.debug(self.vapi.cli("show trace max 1000"))
793 self.logger.info(self.vapi.ppcli("show interface"))
794 self.logger.info(self.vapi.ppcli("show hardware"))
795 self.logger.info(self.statistics.set_errors_str())
796 self.logger.info(self.vapi.ppcli("show run"))
797 self.logger.info(self.vapi.ppcli("show log"))
798 self.logger.info(self.vapi.ppcli("show bihash"))
799 self.logger.info("Logging testcase specific show commands.")
800 self.show_commands_at_teardown()
801 self.registry.remove_vpp_config(self.logger)
802 # Save/Dump VPP api trace log
803 m = self._testMethodName
804 api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
805 tmp_api_trace = "/tmp/%s" % api_trace
806 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
807 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
808 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
810 os.rename(tmp_api_trace, vpp_api_trace_log)
811 except VppTransportSocketIOError:
812 self.logger.debug("VppTransportSocketIOError: Vpp dead. "
813 "Cannot log show commands.")
816 self.registry.unregister_all(self.logger)
819 """ Clear trace before running each test"""
820 super(VppTestCase, self).setUp()
821 self.reporter.send_keep_alive(self)
823 raise VppDiedError(rv=None, testcase=self.__class__.__name__,
824 method_name=self._testMethodName)
825 self.sleep(.1, "during setUp")
826 self.vpp_stdout_deque.append(
827 "--- test setUp() for %s.%s(%s) starts here ---\n" %
828 (self.__class__.__name__, self._testMethodName,
829 self._testMethodDoc))
830 self.vpp_stderr_deque.append(
831 "--- test setUp() for %s.%s(%s) starts here ---\n" %
832 (self.__class__.__name__, self._testMethodName,
833 self._testMethodDoc))
834 self.vapi.cli("clear trace")
835 # store the test instance inside the test class - so that objects
836 # holding the class can access instance methods (like assertEqual)
837 type(self).test_instance = self
840 def pg_enable_capture(cls, interfaces=None):
842 Enable capture on packet-generator interfaces
844 :param interfaces: iterable interface indexes (if None,
845 use self.pg_interfaces)
848 if interfaces is None:
849 interfaces = cls.pg_interfaces
854 def register_pcap(cls, intf, worker):
855 """ Register a pcap in the testclass """
856 # add to the list of captures with current timestamp
857 cls._pcaps.append((intf, worker))
860 def get_vpp_time(cls):
861 # processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT"
862 # returns float("2.190522")
863 timestr = cls.vapi.cli('show clock')
864 head, sep, tail = timestr.partition(',')
865 head, sep, tail = head.partition('Time now')
869 def sleep_on_vpp_time(cls, sec):
870 """ Sleep according to time in VPP world """
871 # On a busy system with many processes
872 # we might end up with VPP time being slower than real world
873 # So take that into account when waiting for VPP to do something
874 start_time = cls.get_vpp_time()
875 while cls.get_vpp_time() - start_time < sec:
879 def pg_start(cls, trace=True):
880 """ Enable the PG, wait till it is done, then clean up """
881 for (intf, worker) in cls._old_pcaps:
882 intf.rename_old_pcap_file(intf.get_in_path(worker),
883 intf.in_history_counter)
886 cls.vapi.cli("clear trace")
887 cls.vapi.cli("trace add pg-input 1000")
888 cls.vapi.cli('packet-generator enable')
889 # PG, when starts, runs to completion -
890 # so let's avoid a race condition,
891 # and wait a little till it's done.
892 # Then clean it up - and then be gone.
893 deadline = time.time() + 300
894 while cls.vapi.cli('show packet-generator').find("Yes") != -1:
895 cls.sleep(0.01) # yield
896 if time.time() > deadline:
897 cls.logger.error("Timeout waiting for pg to stop")
899 for intf, worker in cls._pcaps:
900 cls.vapi.cli('packet-generator delete %s' %
901 intf.get_cap_name(worker))
902 cls._old_pcaps = cls._pcaps
906 def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0,
909 Create packet-generator interfaces.
911 :param interfaces: iterable indexes of the interfaces.
912 :returns: List of created interfaces.
917 intf = VppPGInterface(cls, i, gso, gso_size, mode)
918 setattr(cls, intf.name, intf)
920 cls.pg_interfaces = result
924 def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
925 pgmode = VppEnum.vl_api_pg_interface_mode_t
926 return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
927 pgmode.PG_API_MODE_IP4)
930 def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
931 pgmode = VppEnum.vl_api_pg_interface_mode_t
932 return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
933 pgmode.PG_API_MODE_IP6)
936 def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
937 pgmode = VppEnum.vl_api_pg_interface_mode_t
938 return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
939 pgmode.PG_API_MODE_ETHERNET)
942 def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
943 pgmode = VppEnum.vl_api_pg_interface_mode_t
944 return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
945 pgmode.PG_API_MODE_ETHERNET)
948 def create_loopback_interfaces(cls, count):
950 Create loopback interfaces.
952 :param count: number of interfaces created.
953 :returns: List of created interfaces.
955 result = [VppLoInterface(cls) for i in range(count)]
957 setattr(cls, intf.name, intf)
958 cls.lo_interfaces = result
962 def create_bvi_interfaces(cls, count):
964 Create BVI interfaces.
966 :param count: number of interfaces created.
967 :returns: List of created interfaces.
969 result = [VppBviInterface(cls) for i in range(count)]
971 setattr(cls, intf.name, intf)
972 cls.bvi_interfaces = result
976 def extend_packet(packet, size, padding=' '):
978 Extend packet to given size by padding with spaces or custom padding
979 NOTE: Currently works only when Raw layer is present.
981 :param packet: packet
982 :param size: target size
983 :param padding: padding used to extend the payload
986 packet_len = len(packet) + 4
987 extend = size - packet_len
989 num = (extend // len(padding)) + 1
990 packet[Raw].load += (padding * num)[:extend].encode("ascii")
993 def reset_packet_infos(cls):
994 """ Reset the list of packet info objects and packet counts to zero """
995 cls._packet_infos = {}
996 cls._packet_count_for_dst_if_idx = {}
999 def create_packet_info(cls, src_if, dst_if):
1001 Create packet info object containing the source and destination indexes
1002 and add it to the testcase's packet info list
1004 :param VppInterface src_if: source interface
1005 :param VppInterface dst_if: destination interface
1007 :returns: _PacketInfo object
1010 info = _PacketInfo()
1011 info.index = len(cls._packet_infos)
1012 info.src = src_if.sw_if_index
1013 info.dst = dst_if.sw_if_index
1014 if isinstance(dst_if, VppSubInterface):
1015 dst_idx = dst_if.parent.sw_if_index
1017 dst_idx = dst_if.sw_if_index
1018 if dst_idx in cls._packet_count_for_dst_if_idx:
1019 cls._packet_count_for_dst_if_idx[dst_idx] += 1
1021 cls._packet_count_for_dst_if_idx[dst_idx] = 1
1022 cls._packet_infos[info.index] = info
1026 def info_to_payload(info):
1028 Convert _PacketInfo object to packet payload
1030 :param info: _PacketInfo object
1032 :returns: string containing serialized data from packet info
1035 # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1036 return pack('iiiih', info.index, info.src,
1037 info.dst, info.ip, info.proto)
1040 def payload_to_info(payload, payload_field='load'):
1042 Convert packet payload to _PacketInfo object
1044 :param payload: packet payload
1045 :type payload: <class 'scapy.packet.Raw'>
1046 :param payload_field: packet fieldname of payload "load" for
1047 <class 'scapy.packet.Raw'>
1048 :type payload_field: str
1049 :returns: _PacketInfo object containing de-serialized data from payload
1053 # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1054 payload_b = getattr(payload, payload_field)[:18]
1056 info = _PacketInfo()
1057 info.index, info.src, info.dst, info.ip, info.proto \
1058 = unpack('iiiih', payload_b)
1060 # some SRv6 TCs depend on get an exception if bad values are detected
1061 if info.index > 0x4000:
1062 raise ValueError('Index value is invalid')
1066 def get_next_packet_info(self, info):
1068 Iterate over the packet info list stored in the testcase
1069 Start iteration with first element if info is None
1070 Continue based on index in info if info is specified
1072 :param info: info or None
1073 :returns: next info in list or None if no more infos
1078 next_index = info.index + 1
1079 if next_index == len(self._packet_infos):
1082 return self._packet_infos[next_index]
1084 def get_next_packet_info_for_interface(self, src_index, info):
1086 Search the packet info list for the next packet info with same source
1089 :param src_index: source interface index to search for
1090 :param info: packet info - where to start the search
1091 :returns: packet info or None
1095 info = self.get_next_packet_info(info)
1098 if info.src == src_index:
1101 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
1103 Search the packet info list for the next packet info with same source
1104 and destination interface indexes
1106 :param src_index: source interface index to search for
1107 :param dst_index: destination interface index to search for
1108 :param info: packet info - where to start the search
1109 :returns: packet info or None
1113 info = self.get_next_packet_info_for_interface(src_index, info)
1116 if info.dst == dst_index:
1119 def assert_equal(self, real_value, expected_value, name_or_class=None):
1120 if name_or_class is None:
1121 self.assertEqual(real_value, expected_value)
1124 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
1125 msg = msg % (getdoc(name_or_class).strip(),
1126 real_value, str(name_or_class(real_value)),
1127 expected_value, str(name_or_class(expected_value)))
1129 msg = "Invalid %s: %s does not match expected value %s" % (
1130 name_or_class, real_value, expected_value)
1132 self.assertEqual(real_value, expected_value, msg)
1134 def assert_in_range(self,
1142 msg = "Invalid %s: %s out of range <%s,%s>" % (
1143 name, real_value, expected_min, expected_max)
1144 self.assertTrue(expected_min <= real_value <= expected_max, msg)
1146 def assert_packet_checksums_valid(self, packet,
1147 ignore_zero_udp_checksums=True):
1148 received = packet.__class__(scapy.compat.raw(packet))
1149 udp_layers = ['UDP', 'UDPerror']
1150 checksum_fields = ['cksum', 'chksum']
1153 temp = received.__class__(scapy.compat.raw(received))
1155 layer = temp.getlayer(counter)
1157 layer = layer.copy()
1158 layer.remove_payload()
1159 for cf in checksum_fields:
1160 if hasattr(layer, cf):
1161 if ignore_zero_udp_checksums and \
1162 0 == getattr(layer, cf) and \
1163 layer.name in udp_layers:
1165 delattr(temp.getlayer(counter), cf)
1166 checksums.append((counter, cf))
1169 counter = counter + 1
1170 if 0 == len(checksums):
1172 temp = temp.__class__(scapy.compat.raw(temp))
1173 for layer, cf in checksums:
1174 calc_sum = getattr(temp[layer], cf)
1176 getattr(received[layer], cf), calc_sum,
1177 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
1179 "Checksum field `%s` on `%s` layer has correct value `%s`" %
1180 (cf, temp[layer].name, calc_sum))
1182 def assert_checksum_valid(self, received_packet, layer,
1183 field_name='chksum',
1184 ignore_zero_checksum=False):
1185 """ Check checksum of received packet on given layer """
1186 received_packet_checksum = getattr(received_packet[layer], field_name)
1187 if ignore_zero_checksum and 0 == received_packet_checksum:
1189 recalculated = received_packet.__class__(
1190 scapy.compat.raw(received_packet))
1191 delattr(recalculated[layer], field_name)
1192 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1193 self.assert_equal(received_packet_checksum,
1194 getattr(recalculated[layer], field_name),
1195 "packet checksum on layer: %s" % layer)
1197 def assert_ip_checksum_valid(self, received_packet,
1198 ignore_zero_checksum=False):
1199 self.assert_checksum_valid(received_packet, 'IP',
1200 ignore_zero_checksum=ignore_zero_checksum)
1202 def assert_tcp_checksum_valid(self, received_packet,
1203 ignore_zero_checksum=False):
1204 self.assert_checksum_valid(received_packet, 'TCP',
1205 ignore_zero_checksum=ignore_zero_checksum)
1207 def assert_udp_checksum_valid(self, received_packet,
1208 ignore_zero_checksum=True):
1209 self.assert_checksum_valid(received_packet, 'UDP',
1210 ignore_zero_checksum=ignore_zero_checksum)
1212 def assert_embedded_icmp_checksum_valid(self, received_packet):
1213 if received_packet.haslayer(IPerror):
1214 self.assert_checksum_valid(received_packet, 'IPerror')
1215 if received_packet.haslayer(TCPerror):
1216 self.assert_checksum_valid(received_packet, 'TCPerror')
1217 if received_packet.haslayer(UDPerror):
1218 self.assert_checksum_valid(received_packet, 'UDPerror',
1219 ignore_zero_checksum=True)
1220 if received_packet.haslayer(ICMPerror):
1221 self.assert_checksum_valid(received_packet, 'ICMPerror')
1223 def assert_icmp_checksum_valid(self, received_packet):
1224 self.assert_checksum_valid(received_packet, 'ICMP')
1225 self.assert_embedded_icmp_checksum_valid(received_packet)
1227 def assert_icmpv6_checksum_valid(self, pkt):
1228 if pkt.haslayer(ICMPv6DestUnreach):
1229 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
1230 self.assert_embedded_icmp_checksum_valid(pkt)
1231 if pkt.haslayer(ICMPv6EchoRequest):
1232 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
1233 if pkt.haslayer(ICMPv6EchoReply):
1234 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
1236 def get_packet_counter(self, counter):
1237 if counter.startswith("/"):
1238 counter_value = self.statistics.get_counter(counter)
1240 counters = self.vapi.cli("sh errors").split('\n')
1242 for i in range(1, len(counters) - 1):
1243 results = counters[i].split()
1244 if results[1] == counter:
1245 counter_value = int(results[0])
1247 return counter_value
1249 def assert_packet_counter_equal(self, counter, expected_value):
1250 counter_value = self.get_packet_counter(counter)
1251 self.assert_equal(counter_value, expected_value,
1252 "packet counter `%s'" % counter)
1254 def assert_error_counter_equal(self, counter, expected_value):
1255 counter_value = self.statistics[counter].sum()
1256 self.assert_equal(counter_value, expected_value,
1257 "error counter `%s'" % counter)
1260 def sleep(cls, timeout, remark=None):
1262 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1263 # * by Guido, only the main thread can be interrupted.
1265 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1268 if hasattr(os, 'sched_yield'):
1274 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1275 before = time.time()
1278 if after - before > 2 * timeout:
1279 cls.logger.error("unexpected self.sleep() result - "
1280 "slept for %es instead of ~%es!",
1281 after - before, timeout)
1284 "Finished sleep (%s) - slept %es (wanted %es)",
1285 remark, after - before, timeout)
1287 def virtual_sleep(self, timeout, remark=None):
1288 self.logger.debug("Moving VPP time by %s (%s)", timeout, remark)
1289 self.vapi.cli("set clock adjust %s" % timeout)
1291 def pg_send(self, intf, pkts, worker=None, trace=True):
1292 intf.add_stream(pkts, worker=worker)
1293 self.pg_enable_capture(self.pg_interfaces)
1294 self.pg_start(trace=trace)
1296 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None,
1298 self.pg_send(intf, pkts)
1301 for i in self.pg_interfaces:
1302 i.get_capture(0, timeout=timeout)
1303 i.assert_nothing_captured(remark=remark)
1306 self.logger.debug(self.vapi.cli("show trace"))
1308 def send_and_expect(self, intf, pkts, output, n_rx=None, worker=None,
1312 self.pg_send(intf, pkts, worker=worker, trace=trace)
1313 rx = output.get_capture(n_rx)
1315 self.logger.debug(self.vapi.cli("show trace"))
1318 def send_and_expect_only(self, intf, pkts, output, timeout=None):
1319 self.pg_send(intf, pkts)
1320 rx = output.get_capture(len(pkts))
1324 for i in self.pg_interfaces:
1325 if i not in outputs:
1326 i.get_capture(0, timeout=timeout)
1327 i.assert_nothing_captured()
1333 def get_testcase_doc_name(test):
1334 return getdoc(test.__class__).splitlines()[0]
1337 def get_test_description(descriptions, test):
1338 short_description = test.shortDescription()
1339 if descriptions and short_description:
1340 return short_description
1345 class TestCaseInfo(object):
1346 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1347 self.logger = logger
1348 self.tempdir = tempdir
1349 self.vpp_pid = vpp_pid
1350 self.vpp_bin_path = vpp_bin_path
1351 self.core_crash_test = None
1354 class VppTestResult(unittest.TestResult):
1356 @property result_string
1357 String variable to store the test case result string.
1359 List variable containing 2-tuples of TestCase instances and strings
1360 holding formatted tracebacks. Each tuple represents a test which
1361 raised an unexpected exception.
1363 List variable containing 2-tuples of TestCase instances and strings
1364 holding formatted tracebacks. Each tuple represents a test where
1365 a failure was explicitly signalled using the TestCase.assert*()
1369 failed_test_cases_info = set()
1370 core_crash_test_cases_info = set()
1371 current_test_case_info = None
1373 def __init__(self, stream=None, descriptions=None, verbosity=None,
1376 :param stream File descriptor to store where to report test results.
1377 Set to the standard error stream by default.
1378 :param descriptions Boolean variable to store information if to use
1379 test case descriptions.
1380 :param verbosity Integer variable to store required verbosity level.
1382 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1383 self.stream = stream
1384 self.descriptions = descriptions
1385 self.verbosity = verbosity
1386 self.result_string = None
1387 self.runner = runner
1390 def addSuccess(self, test):
1392 Record a test succeeded result
1397 if self.current_test_case_info:
1398 self.current_test_case_info.logger.debug(
1399 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1400 test._testMethodName,
1401 test._testMethodDoc))
1402 unittest.TestResult.addSuccess(self, test)
1403 self.result_string = colorize("OK", GREEN)
1405 self.send_result_through_pipe(test, PASS)
1407 def addSkip(self, test, reason):
1409 Record a test skipped.
1415 if self.current_test_case_info:
1416 self.current_test_case_info.logger.debug(
1417 "--- addSkip() %s.%s(%s) called, reason is %s" %
1418 (test.__class__.__name__, test._testMethodName,
1419 test._testMethodDoc, reason))
1420 unittest.TestResult.addSkip(self, test, reason)
1421 self.result_string = colorize("SKIP", YELLOW)
1423 if reason == "not enough cpus":
1424 self.send_result_through_pipe(test, SKIP_CPU_SHORTAGE)
1426 self.send_result_through_pipe(test, SKIP)
1428 def symlink_failed(self):
1429 if self.current_test_case_info:
1431 failed_dir = os.getenv('FAILED_DIR')
1432 link_path = os.path.join(
1435 os.path.basename(self.current_test_case_info.tempdir))
1437 self.current_test_case_info.logger.debug(
1438 "creating a link to the failed test")
1439 self.current_test_case_info.logger.debug(
1440 "os.symlink(%s, %s)" %
1441 (self.current_test_case_info.tempdir, link_path))
1442 if os.path.exists(link_path):
1443 self.current_test_case_info.logger.debug(
1444 'symlink already exists')
1446 os.symlink(self.current_test_case_info.tempdir, link_path)
1448 except Exception as e:
1449 self.current_test_case_info.logger.error(e)
1451 def send_result_through_pipe(self, test, result):
1452 if hasattr(self, 'test_framework_result_pipe'):
1453 pipe = self.test_framework_result_pipe
1455 pipe.send((test.id(), result))
1457 def log_error(self, test, err, fn_name):
1458 if self.current_test_case_info:
1459 if isinstance(test, unittest.suite._ErrorHolder):
1460 test_name = test.description
1462 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1463 test._testMethodName,
1464 test._testMethodDoc)
1465 self.current_test_case_info.logger.debug(
1466 "--- %s() %s called, err is %s" %
1467 (fn_name, test_name, err))
1468 self.current_test_case_info.logger.debug(
1469 "formatted exception is:\n%s" %
1470 "".join(format_exception(*err)))
1472 def add_error(self, test, err, unittest_fn, error_type):
1473 if error_type == FAIL:
1474 self.log_error(test, err, 'addFailure')
1475 error_type_str = colorize("FAIL", RED)
1476 elif error_type == ERROR:
1477 self.log_error(test, err, 'addError')
1478 error_type_str = colorize("ERROR", RED)
1480 raise Exception('Error type %s cannot be used to record an '
1481 'error or a failure' % error_type)
1483 unittest_fn(self, test, err)
1484 if self.current_test_case_info:
1485 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1487 self.current_test_case_info.tempdir)
1488 self.symlink_failed()
1489 self.failed_test_cases_info.add(self.current_test_case_info)
1490 if is_core_present(self.current_test_case_info.tempdir):
1491 if not self.current_test_case_info.core_crash_test:
1492 if isinstance(test, unittest.suite._ErrorHolder):
1493 test_name = str(test)
1495 test_name = "'{!s}' ({!s})".format(
1496 get_testcase_doc_name(test), test.id())
1497 self.current_test_case_info.core_crash_test = test_name
1498 self.core_crash_test_cases_info.add(
1499 self.current_test_case_info)
1501 self.result_string = '%s [no temp dir]' % error_type_str
1503 self.send_result_through_pipe(test, error_type)
1505 def addFailure(self, test, err):
1507 Record a test failed result
1510 :param err: error message
1513 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1515 def addError(self, test, err):
1517 Record a test error result
1520 :param err: error message
1523 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1525 def getDescription(self, test):
1527 Get test description
1530 :returns: test description
1533 return get_test_description(self.descriptions, test)
1535 def startTest(self, test):
1543 def print_header(test):
1544 if test.__class__ in self.printed:
1547 test_doc = getdoc(test)
1549 raise Exception("No doc string for test '%s'" % test.id())
1551 test_title = test_doc.splitlines()[0].rstrip()
1552 test_title = colorize(test_title, GREEN)
1553 if test.is_tagged_run_solo():
1554 test_title = colorize(f"SOLO RUN: {test_title}", YELLOW)
1556 # This block may overwrite the colorized title above,
1557 # but we want this to stand out and be fixed
1558 if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
1559 test_title = colorize(
1560 f"FIXME with VPP workers: {test_title}", RED)
1562 if hasattr(test, 'vpp_worker_count'):
1563 if test.vpp_worker_count == 0:
1564 test_title += " [main thread only]"
1565 elif test.vpp_worker_count == 1:
1566 test_title += " [1 worker thread]"
1568 test_title += f" [{test.vpp_worker_count} worker threads]"
1570 if test.__class__.skipped_due_to_cpu_lack:
1571 test_title = colorize(
1572 f"{test_title} [skipped - not enough cpus, "
1573 f"required={test.__class__.get_cpus_required()}, "
1574 f"available={max_vpp_cpus}]", YELLOW)
1576 print(double_line_delim)
1578 print(double_line_delim)
1579 self.printed.append(test.__class__)
1582 self.start_test = time.time()
1583 unittest.TestResult.startTest(self, test)
1584 if self.verbosity > 0:
1585 self.stream.writeln(
1586 "Starting " + self.getDescription(test) + " ...")
1587 self.stream.writeln(single_line_delim)
1589 def stopTest(self, test):
1591 Called when the given test has been run
1596 unittest.TestResult.stopTest(self, test)
1598 if self.verbosity > 0:
1599 self.stream.writeln(single_line_delim)
1600 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1601 self.result_string))
1602 self.stream.writeln(single_line_delim)
1604 self.stream.writeln("%-68s %4.2f %s" %
1605 (self.getDescription(test),
1606 time.time() - self.start_test,
1607 self.result_string))
1609 self.send_result_through_pipe(test, TEST_RUN)
1611 def printErrors(self):
1613 Print errors from running the test case
1615 if len(self.errors) > 0 or len(self.failures) > 0:
1616 self.stream.writeln()
1617 self.printErrorList('ERROR', self.errors)
1618 self.printErrorList('FAIL', self.failures)
1620 # ^^ that is the last output from unittest before summary
1621 if not self.runner.print_summary:
1622 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1623 self.stream = devnull
1624 self.runner.stream = devnull
1626 def printErrorList(self, flavour, errors):
1628 Print error list to the output stream together with error type
1629 and test case description.
1631 :param flavour: error type
1632 :param errors: iterable errors
1635 for test, err in errors:
1636 self.stream.writeln(double_line_delim)
1637 self.stream.writeln("%s: %s" %
1638 (flavour, self.getDescription(test)))
1639 self.stream.writeln(single_line_delim)
1640 self.stream.writeln("%s" % err)
1643 class VppTestRunner(unittest.TextTestRunner):
1645 A basic test runner implementation which prints results to standard error.
1649 def resultclass(self):
1650 """Class maintaining the results of the tests"""
1651 return VppTestResult
1653 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1654 result_pipe=None, failfast=False, buffer=False,
1655 resultclass=None, print_summary=True, **kwargs):
1656 # ignore stream setting here, use hard-coded stdout to be in sync
1657 # with prints from VppTestCase methods ...
1658 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1659 verbosity, failfast, buffer,
1660 resultclass, **kwargs)
1661 KeepAliveReporter.pipe = keep_alive_pipe
1663 self.orig_stream = self.stream
1664 self.resultclass.test_framework_result_pipe = result_pipe
1666 self.print_summary = print_summary
1668 def _makeResult(self):
1669 return self.resultclass(self.stream,
1674 def run(self, test):
1681 faulthandler.enable() # emit stack trace to stderr if killed by signal
1683 result = super(VppTestRunner, self).run(test)
1684 if not self.print_summary:
1685 self.stream = self.orig_stream
1686 result.stream = self.orig_stream
1690 class Worker(Thread):
1691 def __init__(self, executable_args, logger, env=None, *args, **kwargs):
1692 super(Worker, self).__init__(*args, **kwargs)
1693 self.logger = logger
1694 self.args = executable_args
1695 if hasattr(self, 'testcase') and self.testcase.debug_all:
1696 if self.testcase.debug_gdbserver:
1697 self.args = ['/usr/bin/gdbserver', 'localhost:{port}'
1698 .format(port=self.testcase.gdbserver_port)] + args
1699 elif self.testcase.debug_gdb and hasattr(self, 'wait_for_gdb'):
1700 self.args.append(self.wait_for_gdb)
1701 self.app_bin = executable_args[0]
1702 self.app_name = os.path.basename(self.app_bin)
1703 if hasattr(self, 'role'):
1704 self.app_name += ' {role}'.format(role=self.role)
1707 env = {} if env is None else env
1708 self.env = copy.deepcopy(env)
1710 def wait_for_enter(self):
1711 if not hasattr(self, 'testcase'):
1713 if self.testcase.debug_all and self.testcase.debug_gdbserver:
1715 print(double_line_delim)
1716 print("Spawned GDB Server for '{app}' with PID: {pid}"
1717 .format(app=self.app_name, pid=self.process.pid))
1718 elif self.testcase.debug_all and self.testcase.debug_gdb:
1720 print(double_line_delim)
1721 print("Spawned '{app}' with PID: {pid}"
1722 .format(app=self.app_name, pid=self.process.pid))
1725 print(single_line_delim)
1726 print("You can debug '{app}' using:".format(app=self.app_name))
1727 if self.testcase.debug_gdbserver:
1728 print("sudo gdb " + self.app_bin +
1729 " -ex 'target remote localhost:{port}'"
1730 .format(port=self.testcase.gdbserver_port))
1731 print("Now is the time to attach gdb by running the above "
1732 "command, set up breakpoints etc., then resume from "
1733 "within gdb by issuing the 'continue' command")
1734 self.testcase.gdbserver_port += 1
1735 elif self.testcase.debug_gdb:
1736 print("sudo gdb " + self.app_bin +
1737 " -ex 'attach {pid}'".format(pid=self.process.pid))
1738 print("Now is the time to attach gdb by running the above "
1739 "command and set up breakpoints etc., then resume from"
1740 " within gdb by issuing the 'continue' command")
1741 print(single_line_delim)
1742 input("Press ENTER to continue running the testcase...")
1745 executable = self.args[0]
1746 if not os.path.exists(executable) or not os.access(
1747 executable, os.F_OK | os.X_OK):
1748 # Exit code that means some system file did not exist,
1749 # could not be opened, or had some other kind of error.
1750 self.result = os.EX_OSFILE
1751 raise EnvironmentError(
1752 "executable '%s' is not found or executable." % executable)
1753 self.logger.debug("Running executable '{app}': '{cmd}'"
1754 .format(app=self.app_name,
1755 cmd=' '.join(self.args)))
1756 env = os.environ.copy()
1757 env.update(self.env)
1758 env["CK_LOG_FILE_NAME"] = "-"
1759 self.process = subprocess.Popen(
1760 ['stdbuf', '-o0', '-e0'] + self.args, shell=False, env=env,
1761 preexec_fn=os.setpgrp, stdout=subprocess.PIPE,
1762 stderr=subprocess.PIPE)
1763 self.wait_for_enter()
1764 out, err = self.process.communicate()
1765 self.logger.debug("Finished running `{app}'".format(app=self.app_name))
1766 self.logger.info("Return code is `%s'" % self.process.returncode)
1767 self.logger.info(single_line_delim)
1768 self.logger.info("Executable `{app}' wrote to stdout:"
1769 .format(app=self.app_name))
1770 self.logger.info(single_line_delim)
1771 self.logger.info(out.decode('utf-8'))
1772 self.logger.info(single_line_delim)
1773 self.logger.info("Executable `{app}' wrote to stderr:"
1774 .format(app=self.app_name))
1775 self.logger.info(single_line_delim)
1776 self.logger.info(err.decode('utf-8'))
1777 self.logger.info(single_line_delim)
1778 self.result = self.process.returncode
1781 if __name__ == '__main__':