3 from __future__ import print_function
18 from collections import deque
19 from threading import Thread, Event
20 from inspect import getdoc, isclass
21 from traceback import format_exception
22 from logging import FileHandler, DEBUG, Formatter
24 from abc import ABC, abstractmethod
25 from struct import pack, unpack
28 from scapy.packet import Raw, Packet
29 from config import config, available_cpus, num_cpus, max_vpp_cpus
30 import hook as hookmodule
31 from vpp_pg_interface import VppPGInterface
32 from vpp_sub_interface import VppSubInterface
33 from vpp_lo_interface import VppLoInterface
34 from vpp_bvi_interface import VppBviInterface
35 from vpp_papi_provider import VppPapiProvider
36 from vpp_papi import VppEnum
38 from vpp_papi.vpp_stats import VPPStats
39 from vpp_papi.vpp_transport_socket import VppTransportSocketIOError
49 from vpp_object import VppObjectRegistry
50 from util import ppp, is_core_present
51 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
52 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
53 from scapy.layers.inet6 import ICMPv6EchoReply
54 from vpp_running import use_running
57 logger = logging.getLogger(__name__)
59 # Set up an empty logger for the testcase that can be overridden as necessary
60 null_logger = logging.getLogger("VppTestCase")
61 null_logger.addHandler(logging.NullHandler())
71 if config.debug_framework:
75 Test framework module.
77 The module provides a set of tools for constructing and running tests and
78 representing the results.
82 class VppDiedError(Exception):
83 """exception for reporting that the subprocess has died."""
87 for k, v in signal.__dict__.items()
88 if k.startswith("SIG") and not k.startswith("SIG_")
91 def __init__(self, rv=None, testcase=None, method_name=None):
93 self.signal_name = None
94 self.testcase = testcase
95 self.method_name = method_name
98 self.signal_name = VppDiedError.signals_by_value[-rv]
99 except (KeyError, TypeError):
102 if testcase is None and method_name is None:
105 in_msg = " while running %s.%s" % (testcase, method_name)
108 msg = "VPP subprocess died unexpectedly%s with return code: %d%s." % (
111 " [%s]" % (self.signal_name if self.signal_name is not None else ""),
114 msg = "VPP subprocess died unexpectedly%s." % in_msg
116 super(VppDiedError, self).__init__(msg)
119 class _PacketInfo(object):
120 """Private class to create packet info object.
122 Help process information about the next packet.
123 Set variables to default values.
126 #: Store the index of the packet.
128 #: Store the index of the source packet generator interface of the packet.
130 #: Store the index of the destination packet generator interface
133 #: Store expected ip version
135 #: Store expected upper protocol
137 #: Store the copy of the former packet.
140 def __eq__(self, other):
141 index = self.index == other.index
142 src = self.src == other.src
143 dst = self.dst == other.dst
144 data = self.data == other.data
145 return index and src and dst and data
148 def pump_output(testclass):
149 """pump output from vpp stdout/stderr to proper queues"""
150 if not hasattr(testclass, "vpp"):
154 while not testclass.pump_thread_stop_flag.is_set():
155 readable = select.select(
157 testclass.vpp.stdout.fileno(),
158 testclass.vpp.stderr.fileno(),
159 testclass.pump_thread_wakeup_pipe[0],
164 if testclass.vpp.stdout.fileno() in readable:
165 read = os.read(testclass.vpp.stdout.fileno(), 102400)
167 split = read.decode("ascii", errors="backslashreplace").splitlines(True)
168 if len(stdout_fragment) > 0:
169 split[0] = "%s%s" % (stdout_fragment, split[0])
170 if len(split) > 0 and split[-1].endswith("\n"):
174 stdout_fragment = split[-1]
175 testclass.vpp_stdout_deque.extend(split[:limit])
176 if not config.cache_vpp_output:
177 for line in split[:limit]:
178 testclass.logger.info("VPP STDOUT: %s" % line.rstrip("\n"))
179 if testclass.vpp.stderr.fileno() in readable:
180 read = os.read(testclass.vpp.stderr.fileno(), 102400)
182 split = read.decode("ascii", errors="backslashreplace").splitlines(True)
183 if len(stderr_fragment) > 0:
184 split[0] = "%s%s" % (stderr_fragment, split[0])
185 if len(split) > 0 and split[-1].endswith("\n"):
189 stderr_fragment = split[-1]
191 testclass.vpp_stderr_deque.extend(split[:limit])
192 if not config.cache_vpp_output:
193 for line in split[:limit]:
194 testclass.logger.error("VPP STDERR: %s" % line.rstrip("\n"))
195 # ignoring the dummy pipe here intentionally - the
196 # flag will take care of properly terminating the loop
199 def _is_platform_aarch64():
200 return platform.machine() == "aarch64"
203 is_platform_aarch64 = _is_platform_aarch64()
206 def _is_distro_ubuntu2204():
207 with open("/etc/os-release") as f:
208 for line in f.readlines():
214 is_distro_ubuntu2204 = _is_distro_ubuntu2204()
217 def _is_distro_debian11():
218 with open("/etc/os-release") as f:
219 for line in f.readlines():
220 if "bullseye" in line:
225 is_distro_debian11 = _is_distro_debian11()
228 class KeepAliveReporter(object):
230 Singleton object which reports test start to parent process
236 self.__dict__ = self._shared_state
244 def pipe(self, pipe):
245 if self._pipe is not None:
246 raise Exception("Internal error - pipe should only be set once.")
249 def send_keep_alive(self, test, desc=None):
251 Write current test tmpdir & desc to keep-alive pipe to signal liveness
253 if not hasattr(test, "vpp") or self.pipe is None:
254 # if not running forked..
258 desc = "%s (%s)" % (desc, unittest.util.strclass(test))
262 self.pipe.send((desc, config.vpp, test.tempdir, test.vpp.pid))
265 class TestCaseTag(Enum):
266 # marks the suites that must run at the end
267 # using only a single test runner
269 # marks the suites broken on VPP multi-worker
270 FIXME_VPP_WORKERS = 2
271 # marks the suites broken when ASan is enabled
273 # marks suites broken on Ubuntu-22.04
275 # marks suites broken on Debian-11
279 def create_tag_decorator(e):
282 cls.test_tags.append(e)
283 except AttributeError:
290 tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
291 tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
292 tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN)
293 tag_fixme_ubuntu2204 = create_tag_decorator(TestCaseTag.FIXME_UBUNTU2204)
294 tag_fixme_debian11 = create_tag_decorator(TestCaseTag.FIXME_DEBIAN11)
308 class CPUInterface(ABC):
310 skipped_due_to_cpu_lack = False
314 def get_cpus_required(cls):
318 def assign_cpus(cls, cpus):
323 class VppTestCase(CPUInterface, unittest.TestCase):
324 """This subclass is a base class for VPP test cases that are implemented as
325 classes. It provides methods to create and run test case.
328 extra_vpp_statseg_config = ""
329 extra_vpp_punt_config = []
330 extra_vpp_plugin_config = []
332 vapi_response_timeout = 5
333 remove_configured_vpp_objects_on_tear_down = True
336 def packet_infos(self):
337 """List of packet infos"""
338 return self._packet_infos
341 def get_packet_count_for_if_idx(cls, dst_if_index):
342 """Get the number of packet info for specified destination if index"""
343 if dst_if_index in cls._packet_count_for_dst_if_idx:
344 return cls._packet_count_for_dst_if_idx[dst_if_index]
349 def has_tag(cls, tag):
350 """if the test case has a given tag - return true"""
352 return tag in cls.test_tags
353 except AttributeError:
358 def is_tagged_run_solo(cls):
359 """if the test case class is timing-sensitive - return true"""
360 return cls.has_tag(TestCaseTag.RUN_SOLO)
363 def skip_fixme_asan(cls):
364 """if @tag_fixme_asan & ASan is enabled - mark for skip"""
365 if cls.has_tag(TestCaseTag.FIXME_ASAN):
366 vpp_extra_cmake_args = os.environ.get("VPP_EXTRA_CMAKE_ARGS", "")
367 if "DVPP_ENABLE_SANITIZE_ADDR=ON" in vpp_extra_cmake_args:
368 cls = unittest.skip("Skipping @tag_fixme_asan tests")(cls)
371 def skip_fixme_ubuntu2204(cls):
372 """if distro is ubuntu 22.04 and @tag_fixme_ubuntu2204 mark for skip"""
373 if cls.has_tag(TestCaseTag.FIXME_UBUNTU2204):
374 cls = unittest.skip("Skipping @tag_fixme_ubuntu2204 tests")(cls)
377 def skip_fixme_debian11(cls):
378 """if distro is Debian-11 and @tag_fixme_debian11 mark for skip"""
379 if cls.has_tag(TestCaseTag.FIXME_DEBIAN11):
380 cls = unittest.skip("Skipping @tag_fixme_debian11 tests")(cls)
384 """Return the instance of this testcase"""
385 return cls.test_instance
388 def set_debug_flags(cls, d):
389 cls.gdbserver_port = 7777
390 cls.debug_core = False
391 cls.debug_gdb = False
392 cls.debug_gdbserver = False
393 cls.debug_all = False
394 cls.debug_attach = False
399 cls.debug_core = True
400 elif dl == "gdb" or dl == "gdb-all":
402 elif dl == "gdbserver" or dl == "gdbserver-all":
403 cls.debug_gdbserver = True
405 cls.debug_attach = True
407 raise Exception("Unrecognized DEBUG option: '%s'" % d)
408 if dl == "gdb-all" or dl == "gdbserver-all":
412 def get_vpp_worker_count(cls):
413 if not hasattr(cls, "vpp_worker_count"):
414 if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
415 cls.vpp_worker_count = 0
417 cls.vpp_worker_count = config.vpp_worker_count
418 return cls.vpp_worker_count
421 def get_cpus_required(cls):
422 return 1 + cls.get_vpp_worker_count()
425 def setUpConstants(cls):
426 """Set-up the test case class based on environment variables"""
427 cls.step = config.step
428 cls.plugin_path = ":".join(config.vpp_plugin_dir)
429 cls.test_plugin_path = ":".join(config.vpp_test_plugin_dir)
430 cls.extern_plugin_path = ":".join(config.extern_plugin_dir)
432 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
433 debug_cli = "cli-listen localhost:5002"
434 size = re.search(r"\d+[gG]", config.coredump_size)
436 coredump_size = f"coredump-size {config.coredump_size}".lower()
438 coredump_size = "coredump-size unlimited"
439 default_variant = config.variant
440 if default_variant is not None:
441 default_variant = "default { variant %s 100 }" % default_variant
445 api_fuzzing = config.api_fuzz
446 if api_fuzzing is None:
467 cls.get_api_segment_prefix(),
474 if cls.extern_plugin_path not in (None, ""):
475 cls.extra_vpp_plugin_config.append("add-path %s" % cls.extern_plugin_path)
476 if cls.get_vpp_worker_count():
477 cls.vpp_cmdline.extend(
478 ["corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])]
480 cls.vpp_cmdline.extend(
491 cls.get_stats_sock_path(),
492 cls.extra_vpp_statseg_config,
497 cls.get_api_sock_path(),
518 "lisp_unittest_plugin.so",
523 "unittest_plugin.so",
528 + cls.extra_vpp_plugin_config
534 if cls.extra_vpp_punt_config is not None:
535 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
537 if not cls.debug_attach:
538 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
539 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
542 def wait_for_enter(cls):
543 if cls.debug_gdbserver:
544 print(double_line_delim)
545 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
547 print(double_line_delim)
548 print("Spawned VPP with PID: %d" % cls.vpp.pid)
550 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
552 print(single_line_delim)
553 print("You can debug VPP using:")
554 if cls.debug_gdbserver:
556 f"sudo gdb {config.vpp} "
557 f"-ex 'target remote localhost:{cls.gdbserver_port}'"
560 "Now is the time to attach gdb by running the above "
561 "command, set up breakpoints etc., then resume VPP from "
562 "within gdb by issuing the 'continue' command"
564 cls.gdbserver_port += 1
566 print(f"sudo gdb {config.vpp} -ex 'attach {cls.vpp.pid}'")
568 "Now is the time to attach gdb by running the above "
569 "command and set up breakpoints etc., then resume VPP from"
570 " within gdb by issuing the 'continue' command"
572 print(single_line_delim)
573 input("Press ENTER to continue running the testcase...")
582 is_distro_ubuntu2204 == True and cls.has_tag(TestCaseTag.FIXME_UBUNTU2204)
583 ) or (is_distro_debian11 == True and cls.has_tag(TestCaseTag.FIXME_DEBIAN11)):
585 cls.logger.debug(f"Assigned cpus: {cls.cpus}")
586 cmdline = cls.vpp_cmdline
588 if cls.debug_gdbserver:
589 gdbserver = "/usr/bin/gdbserver"
590 if not os.path.isfile(gdbserver) or not os.access(gdbserver, os.X_OK):
592 "gdbserver binary '%s' does not exist or is "
593 "not executable" % gdbserver
598 "localhost:{port}".format(port=cls.gdbserver_port),
600 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
603 cls.vpp = subprocess.Popen(
604 cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE
606 except subprocess.CalledProcessError as e:
608 "Subprocess returned with non-0 return code: (%s)", e.returncode
613 "Subprocess returned with OS error: (%s) %s", e.errno, e.strerror
616 except Exception as e:
617 cls.logger.exception("Subprocess returned unexpected from %s:", cmdline)
623 def wait_for_coredump(cls):
624 corefile = cls.tempdir + "/core"
625 if os.path.isfile(corefile):
626 cls.logger.error("Waiting for coredump to complete: %s", corefile)
627 curr_size = os.path.getsize(corefile)
628 deadline = time.time() + 60
630 while time.time() < deadline:
633 curr_size = os.path.getsize(corefile)
634 if size == curr_size:
639 "Timed out waiting for coredump to complete: %s", corefile
642 cls.logger.error("Coredump complete: %s, size %d", corefile, curr_size)
645 def get_stats_sock_path(cls):
646 return "%s/stats.sock" % cls.tempdir
649 def get_api_sock_path(cls):
650 return "%s/api.sock" % cls.tempdir
653 def get_api_segment_prefix(cls):
654 return os.path.basename(cls.tempdir) # Only used for VAPI
657 def get_tempdir(cls):
659 tmpdir = f"{config.tmp_dir}/unittest-attach-gdb"
661 tmpdir = f"{config.tmp_dir}/vpp-unittest-{cls.__name__}"
662 if config.wipe_tmp_dir:
663 shutil.rmtree(tmpdir, ignore_errors=True)
668 def create_file_handler(cls):
669 if config.log_dir is None:
670 cls.file_handler = FileHandler(f"{cls.tempdir}/log.txt")
673 logdir = f"{config.log_dir}/vpp-unittest-{cls.__name__}"
674 if config.wipe_tmp_dir:
675 shutil.rmtree(logdir, ignore_errors=True)
677 cls.file_handler = FileHandler(f"{logdir}/log.txt")
682 Perform class setup before running the testcase
683 Remove shared memory files, start vpp and connect the vpp-api
685 super(VppTestCase, cls).setUpClass()
686 cls.logger = get_logger(cls.__name__)
687 random.seed(config.rnd_seed)
688 if hasattr(cls, "parallel_handler"):
689 cls.logger.addHandler(cls.parallel_handler)
690 cls.logger.propagate = False
691 cls.set_debug_flags(config.debug)
692 cls.tempdir = cls.get_tempdir()
693 cls.create_file_handler()
694 cls.file_handler.setFormatter(
695 Formatter(fmt="%(asctime)s,%(msecs)03d %(message)s", datefmt="%H:%M:%S")
697 cls.file_handler.setLevel(DEBUG)
698 cls.logger.addHandler(cls.file_handler)
699 cls.logger.debug("--- setUpClass() for %s called ---" % cls.__name__)
700 os.chdir(cls.tempdir)
702 "Temporary dir is %s, api socket is %s",
704 cls.get_api_sock_path(),
706 cls.logger.debug("Random seed is %s", config.rnd_seed)
708 cls.reset_packet_infos()
713 cls.registry = VppObjectRegistry()
714 cls.vpp_startup_failed = False
715 cls.reporter = KeepAliveReporter()
716 # need to catch exceptions here because if we raise, then the cleanup
717 # doesn't get called and we might end with a zombie vpp
723 if not hasattr(cls, "vpp"):
725 cls.reporter.send_keep_alive(cls, "setUpClass")
726 VppTestResult.current_test_case_info = TestCaseInfo(
727 cls.logger, cls.tempdir, cls.vpp.pid, config.vpp
729 cls.vpp_stdout_deque = deque()
730 cls.vpp_stderr_deque = deque()
731 # Pump thread in a non-debug-attached & not running-vpp
732 if not cls.debug_attach and not hasattr(cls, "running_vpp"):
733 cls.pump_thread_stop_flag = Event()
734 cls.pump_thread_wakeup_pipe = os.pipe()
735 cls.pump_thread = Thread(target=pump_output, args=(cls,))
736 cls.pump_thread.daemon = True
737 cls.pump_thread.start()
738 if cls.debug_gdb or cls.debug_gdbserver or cls.debug_attach:
739 cls.vapi_response_timeout = 0
740 cls.vapi = VppPapiProvider(cls.__name__, cls, cls.vapi_response_timeout)
742 hook = hookmodule.StepHook(cls)
744 hook = hookmodule.PollHook(cls)
745 cls.vapi.register_hook(hook)
746 cls.statistics = VPPStats(socketname=cls.get_stats_sock_path())
750 cls.vpp_startup_failed = True
752 "VPP died shortly after startup, check the"
753 " output to standard error for possible cause"
758 except (vpp_papi.VPPIOError, Exception) as e:
759 cls.logger.debug("Exception connecting to vapi: %s" % e)
760 cls.vapi.disconnect()
762 if cls.debug_gdbserver:
765 "You're running VPP inside gdbserver but "
766 "VPP-API connection failed, did you forget "
767 "to 'continue' VPP from within gdb?",
773 last_line = cls.vapi.cli("show thread").split("\n")[-2]
774 cls.vpp_worker_count = int(last_line.split(" ")[0])
775 print("Detected VPP with %s workers." % cls.vpp_worker_count)
776 except vpp_papi.VPPRuntimeError as e:
777 cls.logger.debug("%s" % e)
780 except Exception as e:
781 cls.logger.debug("Exception connecting to VPP: %s" % e)
786 def _debug_quit(cls):
787 if cls.debug_gdbserver or cls.debug_gdb:
791 if cls.vpp.returncode is None:
793 print(double_line_delim)
794 print("VPP or GDB server is still running")
795 print(single_line_delim)
797 "When done debugging, press ENTER to kill the "
798 "process and finish running the testcase..."
800 except AttributeError:
806 Disconnect vpp-api, kill vpp and cleanup shared memory files
809 if hasattr(cls, "running_vpp"):
812 # first signal that we want to stop the pump thread, then wake it up
813 if hasattr(cls, "pump_thread_stop_flag"):
814 cls.pump_thread_stop_flag.set()
815 if hasattr(cls, "pump_thread_wakeup_pipe"):
816 os.write(cls.pump_thread_wakeup_pipe[1], b"ding dong wake up")
817 if hasattr(cls, "pump_thread"):
818 cls.logger.debug("Waiting for pump thread to stop")
819 cls.pump_thread.join()
820 if hasattr(cls, "vpp_stderr_reader_thread"):
821 cls.logger.debug("Waiting for stderr pump to stop")
822 cls.vpp_stderr_reader_thread.join()
824 if hasattr(cls, "vpp"):
825 if hasattr(cls, "vapi"):
826 cls.logger.debug(cls.vapi.vpp.get_stats())
827 cls.logger.debug("Disconnecting class vapi client on %s", cls.__name__)
828 cls.vapi.disconnect()
829 cls.logger.debug("Deleting class vapi attribute on %s", cls.__name__)
832 if not cls.debug_attach and cls.vpp.returncode is None:
833 cls.wait_for_coredump()
834 cls.logger.debug("Sending TERM to vpp")
836 cls.logger.debug("Waiting for vpp to die")
838 outs, errs = cls.vpp.communicate(timeout=5)
839 except subprocess.TimeoutExpired:
841 outs, errs = cls.vpp.communicate()
842 cls.logger.debug("Deleting class vpp attribute on %s", cls.__name__)
843 if not cls.debug_attach and not hasattr(cls, "running_vpp"):
844 cls.vpp.stdout.close()
845 cls.vpp.stderr.close()
846 # If vpp is a dynamic attribute set by the func use_running,
847 # deletion will result in an AttributeError that we can
851 except AttributeError:
854 if cls.vpp_startup_failed:
855 stdout_log = cls.logger.info
856 stderr_log = cls.logger.critical
858 stdout_log = cls.logger.info
859 stderr_log = cls.logger.info
861 if hasattr(cls, "vpp_stdout_deque"):
862 stdout_log(single_line_delim)
863 stdout_log("VPP output to stdout while running %s:", cls.__name__)
864 stdout_log(single_line_delim)
865 vpp_output = "".join(cls.vpp_stdout_deque)
866 with open(cls.tempdir + "/vpp_stdout.txt", "w") as f:
868 stdout_log("\n%s", vpp_output)
869 stdout_log(single_line_delim)
871 if hasattr(cls, "vpp_stderr_deque"):
872 stderr_log(single_line_delim)
873 stderr_log("VPP output to stderr while running %s:", cls.__name__)
874 stderr_log(single_line_delim)
875 vpp_output = "".join(cls.vpp_stderr_deque)
876 with open(cls.tempdir + "/vpp_stderr.txt", "w") as f:
878 stderr_log("\n%s", vpp_output)
879 stderr_log(single_line_delim)
882 def tearDownClass(cls):
883 """Perform final cleanup after running all tests in this test-case"""
884 cls.logger.debug("--- tearDownClass() for %s called ---" % cls.__name__)
885 if not hasattr(cls, "vpp"):
887 cls.reporter.send_keep_alive(cls, "tearDownClass")
889 cls.file_handler.close()
890 cls.reset_packet_infos()
891 if config.debug_framework:
892 debug_internal.on_tear_down_class(cls)
894 def show_commands_at_teardown(self):
895 """Allow subclass specific teardown logging additions."""
896 self.logger.info("--- No test specific show commands provided. ---")
899 """Show various debug prints after each test"""
901 "--- tearDown() for %s.%s(%s) called ---"
902 % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
904 if not hasattr(self, "vpp"):
908 if not self.vpp_dead:
909 self.logger.debug(self.vapi.cli("show trace max 1000"))
910 self.logger.info(self.vapi.ppcli("show interface"))
911 self.logger.info(self.vapi.ppcli("show hardware"))
912 self.logger.info(self.statistics.set_errors_str())
913 self.logger.info(self.vapi.ppcli("show run"))
914 self.logger.info(self.vapi.ppcli("show log"))
915 self.logger.info(self.vapi.ppcli("show bihash"))
916 self.logger.info("Logging testcase specific show commands.")
917 self.show_commands_at_teardown()
918 if self.remove_configured_vpp_objects_on_tear_down:
919 self.registry.remove_vpp_config(self.logger)
920 # Save/Dump VPP api trace log
921 m = self._testMethodName
922 api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
923 tmp_api_trace = "/tmp/%s" % api_trace
924 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
925 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
926 self.logger.info("Moving %s to %s\n" % (tmp_api_trace, vpp_api_trace_log))
927 os.rename(tmp_api_trace, vpp_api_trace_log)
928 except VppTransportSocketIOError:
930 "VppTransportSocketIOError: Vpp dead. Cannot log show commands."
934 self.registry.unregister_all(self.logger)
937 """Clear trace before running each test"""
938 super(VppTestCase, self).setUp()
939 if not hasattr(self, "vpp"):
941 self.reporter.send_keep_alive(self)
945 testcase=self.__class__.__name__,
946 method_name=self._testMethodName,
948 self.sleep(0.1, "during setUp")
949 self.vpp_stdout_deque.append(
950 "--- test setUp() for %s.%s(%s) starts here ---\n"
951 % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
953 self.vpp_stderr_deque.append(
954 "--- test setUp() for %s.%s(%s) starts here ---\n"
955 % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
957 self.vapi.cli("clear trace")
958 # store the test instance inside the test class - so that objects
959 # holding the class can access instance methods (like assertEqual)
960 type(self).test_instance = self
963 def pg_enable_capture(cls, interfaces=None):
965 Enable capture on packet-generator interfaces
967 :param interfaces: iterable interface indexes (if None,
968 use self.pg_interfaces)
971 if interfaces is None:
972 interfaces = cls.pg_interfaces
977 def register_pcap(cls, intf, worker):
978 """Register a pcap in the testclass"""
979 # add to the list of captures with current timestamp
980 cls._pcaps.append((intf, worker))
983 def get_vpp_time(cls):
984 # processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT"
985 # returns float("2.190522")
986 timestr = cls.vapi.cli("show clock")
987 head, sep, tail = timestr.partition(",")
988 head, sep, tail = head.partition("Time now")
992 def sleep_on_vpp_time(cls, sec):
993 """Sleep according to time in VPP world"""
994 # On a busy system with many processes
995 # we might end up with VPP time being slower than real world
996 # So take that into account when waiting for VPP to do something
997 start_time = cls.get_vpp_time()
998 while cls.get_vpp_time() - start_time < sec:
1002 def pg_start(cls, trace=True):
1003 """Enable the PG, wait till it is done, then clean up"""
1004 for (intf, worker) in cls._old_pcaps:
1005 intf.handle_old_pcap_file(intf.get_in_path(worker), intf.in_history_counter)
1008 cls.vapi.cli("clear trace")
1009 cls.vapi.cli("trace add pg-input 1000")
1010 cls.vapi.cli("packet-generator enable")
1011 # PG, when starts, runs to completion -
1012 # so let's avoid a race condition,
1013 # and wait a little till it's done.
1014 # Then clean it up - and then be gone.
1015 deadline = time.time() + 300
1016 while cls.vapi.cli("show packet-generator").find("Yes") != -1:
1017 cls.sleep(0.01) # yield
1018 if time.time() > deadline:
1019 cls.logger.error("Timeout waiting for pg to stop")
1021 for intf, worker in cls._pcaps:
1022 cls.vapi.cli("packet-generator delete %s" % intf.get_cap_name(worker))
1023 cls._old_pcaps = cls._pcaps
1027 def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0, mode=None):
1029 Create packet-generator interfaces.
1031 :param interfaces: iterable indexes of the interfaces.
1032 :returns: List of created interfaces.
1036 for i in interfaces:
1037 intf = VppPGInterface(cls, i, gso, gso_size, mode)
1038 setattr(cls, intf.name, intf)
1040 cls.pg_interfaces = result
1044 def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
1045 if not hasattr(cls, "vpp"):
1046 cls.pg_interfaces = []
1047 return cls.pg_interfaces
1048 pgmode = VppEnum.vl_api_pg_interface_mode_t
1049 return cls.create_pg_interfaces_internal(
1050 interfaces, gso, gso_size, pgmode.PG_API_MODE_IP4
1054 def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
1055 if not hasattr(cls, "vpp"):
1056 cls.pg_interfaces = []
1057 return cls.pg_interfaces
1058 pgmode = VppEnum.vl_api_pg_interface_mode_t
1059 return cls.create_pg_interfaces_internal(
1060 interfaces, gso, gso_size, pgmode.PG_API_MODE_IP6
1064 def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
1065 if not hasattr(cls, "vpp"):
1066 cls.pg_interfaces = []
1067 return cls.pg_interfaces
1068 pgmode = VppEnum.vl_api_pg_interface_mode_t
1069 return cls.create_pg_interfaces_internal(
1070 interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
1074 def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
1075 if not hasattr(cls, "vpp"):
1076 cls.pg_interfaces = []
1077 return cls.pg_interfaces
1078 pgmode = VppEnum.vl_api_pg_interface_mode_t
1079 return cls.create_pg_interfaces_internal(
1080 interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
1084 def create_loopback_interfaces(cls, count):
1086 Create loopback interfaces.
1088 :param count: number of interfaces created.
1089 :returns: List of created interfaces.
1091 if not hasattr(cls, "vpp"):
1092 cls.lo_interfaces = []
1093 return cls.lo_interfaces
1094 result = [VppLoInterface(cls) for i in range(count)]
1096 setattr(cls, intf.name, intf)
1097 cls.lo_interfaces = result
1101 def create_bvi_interfaces(cls, count):
1103 Create BVI interfaces.
1105 :param count: number of interfaces created.
1106 :returns: List of created interfaces.
1108 if not hasattr(cls, "vpp"):
1109 cls.bvi_interfaces = []
1110 return cls.bvi_interfaces
1111 result = [VppBviInterface(cls) for i in range(count)]
1113 setattr(cls, intf.name, intf)
1114 cls.bvi_interfaces = result
1118 def extend_packet(packet, size, padding=" "):
1120 Extend packet to given size by padding with spaces or custom padding
1121 NOTE: Currently works only when Raw layer is present.
1123 :param packet: packet
1124 :param size: target size
1125 :param padding: padding used to extend the payload
1128 packet_len = len(packet) + 4
1129 extend = size - packet_len
1131 num = (extend // len(padding)) + 1
1132 packet[Raw].load += (padding * num)[:extend].encode("ascii")
1135 def reset_packet_infos(cls):
1136 """Reset the list of packet info objects and packet counts to zero"""
1137 cls._packet_infos = {}
1138 cls._packet_count_for_dst_if_idx = {}
1141 def create_packet_info(cls, src_if, dst_if):
1143 Create packet info object containing the source and destination indexes
1144 and add it to the testcase's packet info list
1146 :param VppInterface src_if: source interface
1147 :param VppInterface dst_if: destination interface
1149 :returns: _PacketInfo object
1152 info = _PacketInfo()
1153 info.index = len(cls._packet_infos)
1154 info.src = src_if.sw_if_index
1155 info.dst = dst_if.sw_if_index
1156 if isinstance(dst_if, VppSubInterface):
1157 dst_idx = dst_if.parent.sw_if_index
1159 dst_idx = dst_if.sw_if_index
1160 if dst_idx in cls._packet_count_for_dst_if_idx:
1161 cls._packet_count_for_dst_if_idx[dst_idx] += 1
1163 cls._packet_count_for_dst_if_idx[dst_idx] = 1
1164 cls._packet_infos[info.index] = info
1168 def info_to_payload(info):
1170 Convert _PacketInfo object to packet payload
1172 :param info: _PacketInfo object
1174 :returns: string containing serialized data from packet info
1177 # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1178 return pack("iiiih", info.index, info.src, info.dst, info.ip, info.proto)
1181 def payload_to_info(payload, payload_field="load"):
1183 Convert packet payload to _PacketInfo object
1185 :param payload: packet payload
1186 :type payload: <class 'scapy.packet.Raw'>
1187 :param payload_field: packet fieldname of payload "load" for
1188 <class 'scapy.packet.Raw'>
1189 :type payload_field: str
1190 :returns: _PacketInfo object containing de-serialized data from payload
1194 # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1195 payload_b = getattr(payload, payload_field)[:18]
1197 info = _PacketInfo()
1198 info.index, info.src, info.dst, info.ip, info.proto = unpack("iiiih", payload_b)
1200 # some SRv6 TCs depend on get an exception if bad values are detected
1201 if info.index > 0x4000:
1202 raise ValueError("Index value is invalid")
1206 def get_next_packet_info(self, info):
1208 Iterate over the packet info list stored in the testcase
1209 Start iteration with first element if info is None
1210 Continue based on index in info if info is specified
1212 :param info: info or None
1213 :returns: next info in list or None if no more infos
1218 next_index = info.index + 1
1219 if next_index == len(self._packet_infos):
1222 return self._packet_infos[next_index]
1224 def get_next_packet_info_for_interface(self, src_index, info):
1226 Search the packet info list for the next packet info with same source
1229 :param src_index: source interface index to search for
1230 :param info: packet info - where to start the search
1231 :returns: packet info or None
1235 info = self.get_next_packet_info(info)
1238 if info.src == src_index:
1241 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
1243 Search the packet info list for the next packet info with same source
1244 and destination interface indexes
1246 :param src_index: source interface index to search for
1247 :param dst_index: destination interface index to search for
1248 :param info: packet info - where to start the search
1249 :returns: packet info or None
1253 info = self.get_next_packet_info_for_interface(src_index, info)
1256 if info.dst == dst_index:
1259 def assert_equal(self, real_value, expected_value, name_or_class=None):
1260 if name_or_class is None:
1261 self.assertEqual(real_value, expected_value)
1264 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
1266 getdoc(name_or_class).strip(),
1268 str(name_or_class(real_value)),
1270 str(name_or_class(expected_value)),
1273 msg = "Invalid %s: %s does not match expected value %s" % (
1279 self.assertEqual(real_value, expected_value, msg)
1281 def assert_in_range(self, real_value, expected_min, expected_max, name=None):
1285 msg = "Invalid %s: %s out of range <%s,%s>" % (
1291 self.assertTrue(expected_min <= real_value <= expected_max, msg)
1293 def assert_packet_checksums_valid(self, packet, ignore_zero_udp_checksums=True):
1294 received = packet.__class__(scapy.compat.raw(packet))
1295 udp_layers = ["UDP", "UDPerror"]
1296 checksum_fields = ["cksum", "chksum"]
1299 temp = received.__class__(scapy.compat.raw(received))
1301 layer = temp.getlayer(counter)
1303 layer = layer.copy()
1304 layer.remove_payload()
1305 for cf in checksum_fields:
1306 if hasattr(layer, cf):
1308 ignore_zero_udp_checksums
1309 and 0 == getattr(layer, cf)
1310 and layer.name in udp_layers
1313 delattr(temp.getlayer(counter), cf)
1314 checksums.append((counter, cf))
1317 counter = counter + 1
1318 if 0 == len(checksums):
1320 temp = temp.__class__(scapy.compat.raw(temp))
1321 for layer, cf in checksums:
1322 calc_sum = getattr(temp[layer], cf)
1324 getattr(received[layer], cf),
1326 "packet checksum on layer #%d: %s" % (layer, temp[layer].name),
1329 "Checksum field `%s` on `%s` layer has correct value `%s`"
1330 % (cf, temp[layer].name, calc_sum)
1333 def assert_checksum_valid(
1334 self, received_packet, layer, field_name="chksum", ignore_zero_checksum=False
1336 """Check checksum of received packet on given layer"""
1337 received_packet_checksum = getattr(received_packet[layer], field_name)
1338 if ignore_zero_checksum and 0 == received_packet_checksum:
1340 recalculated = received_packet.__class__(scapy.compat.raw(received_packet))
1341 delattr(recalculated[layer], field_name)
1342 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1344 received_packet_checksum,
1345 getattr(recalculated[layer], field_name),
1346 "packet checksum on layer: %s" % layer,
1349 def assert_ip_checksum_valid(self, received_packet, ignore_zero_checksum=False):
1350 self.assert_checksum_valid(
1351 received_packet, "IP", ignore_zero_checksum=ignore_zero_checksum
1354 def assert_tcp_checksum_valid(self, received_packet, ignore_zero_checksum=False):
1355 self.assert_checksum_valid(
1356 received_packet, "TCP", ignore_zero_checksum=ignore_zero_checksum
1359 def assert_udp_checksum_valid(self, received_packet, ignore_zero_checksum=True):
1360 self.assert_checksum_valid(
1361 received_packet, "UDP", ignore_zero_checksum=ignore_zero_checksum
1364 def assert_embedded_icmp_checksum_valid(self, received_packet):
1365 if received_packet.haslayer(IPerror):
1366 self.assert_checksum_valid(received_packet, "IPerror")
1367 if received_packet.haslayer(TCPerror):
1368 self.assert_checksum_valid(received_packet, "TCPerror")
1369 if received_packet.haslayer(UDPerror):
1370 self.assert_checksum_valid(
1371 received_packet, "UDPerror", ignore_zero_checksum=True
1373 if received_packet.haslayer(ICMPerror):
1374 self.assert_checksum_valid(received_packet, "ICMPerror")
1376 def assert_icmp_checksum_valid(self, received_packet):
1377 self.assert_checksum_valid(received_packet, "ICMP")
1378 self.assert_embedded_icmp_checksum_valid(received_packet)
1380 def assert_icmpv6_checksum_valid(self, pkt):
1381 if pkt.haslayer(ICMPv6DestUnreach):
1382 self.assert_checksum_valid(pkt, "ICMPv6DestUnreach", "cksum")
1383 self.assert_embedded_icmp_checksum_valid(pkt)
1384 if pkt.haslayer(ICMPv6EchoRequest):
1385 self.assert_checksum_valid(pkt, "ICMPv6EchoRequest", "cksum")
1386 if pkt.haslayer(ICMPv6EchoReply):
1387 self.assert_checksum_valid(pkt, "ICMPv6EchoReply", "cksum")
1389 def get_counter(self, counter):
1390 if counter.startswith("/"):
1391 counter_value = self.statistics.get_counter(counter)
1393 counters = self.vapi.cli("sh errors").split("\n")
1395 for i in range(1, len(counters) - 1):
1396 results = counters[i].split()
1397 if results[1] == counter:
1398 counter_value = int(results[0])
1400 return counter_value
1402 def assert_counter_equal(self, counter, expected_value, thread=None, index=0):
1403 c = self.get_counter(counter)
1404 if thread is not None:
1405 c = c[thread][index]
1407 c = sum(x[index] for x in c)
1408 self.assert_equal(c, expected_value, "counter `%s'" % counter)
1410 def assert_packet_counter_equal(self, counter, expected_value):
1411 counter_value = self.get_counter(counter)
1413 counter_value, expected_value, "packet counter `%s'" % counter
1416 def assert_error_counter_equal(self, counter, expected_value):
1417 counter_value = self.statistics[counter].sum()
1418 self.assert_equal(counter_value, expected_value, "error counter `%s'" % counter)
1421 def sleep(cls, timeout, remark=None):
1423 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1424 # * by Guido, only the main thread can be interrupted.
1426 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1429 if hasattr(os, "sched_yield"):
1435 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1436 before = time.time()
1439 if after - before > 2 * timeout:
1441 "unexpected self.sleep() result - slept for %es instead of ~%es!",
1447 "Finished sleep (%s) - slept %es (wanted %es)",
1453 def virtual_sleep(self, timeout, remark=None):
1454 self.logger.debug("Moving VPP time by %s (%s)", timeout, remark)
1455 self.vapi.cli("set clock adjust %s" % timeout)
1457 def pg_send(self, intf, pkts, worker=None, trace=True):
1458 intf.add_stream(pkts, worker=worker)
1459 self.pg_enable_capture(self.pg_interfaces)
1460 self.pg_start(trace=trace)
1462 def snapshot_stats(self, stats_diff):
1463 """Return snapshot of interesting stats based on diff dictionary."""
1465 for sw_if_index in stats_diff:
1466 for counter in stats_diff[sw_if_index]:
1467 stats_snapshot[counter] = self.statistics[counter]
1468 self.logger.debug(f"Took statistics stats_snapshot: {stats_snapshot}")
1469 return stats_snapshot
1471 def compare_stats_with_snapshot(self, stats_diff, stats_snapshot):
1472 """Assert appropriate difference between current stats and snapshot."""
1473 for sw_if_index in stats_diff:
1474 for cntr, diff in stats_diff[sw_if_index].items():
1475 if sw_if_index == "err":
1477 self.statistics[cntr].sum(),
1478 stats_snapshot[cntr].sum() + diff,
1479 f"'{cntr}' counter value (previous value: "
1480 f"{stats_snapshot[cntr].sum()}, "
1481 f"expected diff: {diff})",
1486 self.statistics[cntr][:, sw_if_index].sum(),
1487 stats_snapshot[cntr][:, sw_if_index].sum() + diff,
1488 f"'{cntr}' counter value (previous value: "
1489 f"{stats_snapshot[cntr][:, sw_if_index].sum()}, "
1490 f"expected diff: {diff})",
1493 # if diff is 0, then this most probably a case where
1494 # test declares multiple interfaces but traffic hasn't
1495 # passed through this one yet - which means the counter
1496 # value is 0 and can be ignored
1500 def send_and_assert_no_replies(
1501 self, intf, pkts, remark="", timeout=None, stats_diff=None, trace=True, msg=None
1504 stats_snapshot = self.snapshot_stats(stats_diff)
1506 self.pg_send(intf, pkts)
1511 for i in self.pg_interfaces:
1512 i.assert_nothing_captured(timeout=timeout, remark=remark)
1517 self.logger.debug(f"send_and_assert_no_replies: {msg}")
1518 self.logger.debug(self.vapi.cli("show trace"))
1521 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1523 def send_and_expect(
1535 stats_snapshot = self.snapshot_stats(stats_diff)
1538 n_rx = 1 if isinstance(pkts, Packet) else len(pkts)
1539 self.pg_send(intf, pkts, worker=worker, trace=trace)
1540 rx = output.get_capture(n_rx)
1543 self.logger.debug(f"send_and_expect: {msg}")
1544 self.logger.debug(self.vapi.cli("show trace"))
1547 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1551 def send_and_expect_load_balancing(
1552 self, input, pkts, outputs, worker=None, trace=True
1554 self.pg_send(input, pkts, worker=worker, trace=trace)
1557 rx = oo._get_capture(1)
1558 self.assertNotEqual(0, len(rx))
1561 self.logger.debug(self.vapi.cli("show trace"))
1564 def send_and_expect_some(self, intf, pkts, output, worker=None, trace=True):
1565 self.pg_send(intf, pkts, worker=worker, trace=trace)
1566 rx = output._get_capture(1)
1568 self.logger.debug(self.vapi.cli("show trace"))
1569 self.assertTrue(len(rx) > 0)
1570 self.assertTrue(len(rx) < len(pkts))
1573 def send_and_expect_only(self, intf, pkts, output, timeout=None, stats_diff=None):
1575 stats_snapshot = self.snapshot_stats(stats_diff)
1577 self.pg_send(intf, pkts)
1578 rx = output.get_capture(len(pkts))
1582 for i in self.pg_interfaces:
1583 if i not in outputs:
1584 i.assert_nothing_captured(timeout=timeout)
1588 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1593 def get_testcase_doc_name(test):
1594 return getdoc(test.__class__).splitlines()[0]
1597 def get_test_description(descriptions, test):
1598 short_description = test.shortDescription()
1599 if descriptions and short_description:
1600 return short_description
1605 class TestCaseInfo(object):
1606 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1607 self.logger = logger
1608 self.tempdir = tempdir
1609 self.vpp_pid = vpp_pid
1610 self.vpp_bin_path = vpp_bin_path
1611 self.core_crash_test = None
1614 class VppTestResult(unittest.TestResult):
1616 @property result_string
1617 String variable to store the test case result string.
1619 List variable containing 2-tuples of TestCase instances and strings
1620 holding formatted tracebacks. Each tuple represents a test which
1621 raised an unexpected exception.
1623 List variable containing 2-tuples of TestCase instances and strings
1624 holding formatted tracebacks. Each tuple represents a test where
1625 a failure was explicitly signalled using the TestCase.assert*()
1629 failed_test_cases_info = set()
1630 core_crash_test_cases_info = set()
1631 current_test_case_info = None
1633 def __init__(self, stream=None, descriptions=None, verbosity=None, runner=None):
1635 :param stream File descriptor to store where to report test results.
1636 Set to the standard error stream by default.
1637 :param descriptions Boolean variable to store information if to use
1638 test case descriptions.
1639 :param verbosity Integer variable to store required verbosity level.
1641 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1642 self.stream = stream
1643 self.descriptions = descriptions
1644 self.verbosity = verbosity
1645 self.result_string = None
1646 self.runner = runner
1649 def addSuccess(self, test):
1651 Record a test succeeded result
1656 if self.current_test_case_info:
1657 self.current_test_case_info.logger.debug(
1658 "--- addSuccess() %s.%s(%s) called"
1659 % (test.__class__.__name__, test._testMethodName, test._testMethodDoc)
1661 unittest.TestResult.addSuccess(self, test)
1662 self.result_string = colorize("OK", GREEN)
1664 self.send_result_through_pipe(test, PASS)
1666 def addSkip(self, test, reason):
1668 Record a test skipped.
1674 if self.current_test_case_info:
1675 self.current_test_case_info.logger.debug(
1676 "--- addSkip() %s.%s(%s) called, reason is %s"
1678 test.__class__.__name__,
1679 test._testMethodName,
1680 test._testMethodDoc,
1684 unittest.TestResult.addSkip(self, test, reason)
1685 self.result_string = colorize("SKIP", YELLOW)
1687 if reason == "not enough cpus":
1688 self.send_result_through_pipe(test, SKIP_CPU_SHORTAGE)
1690 self.send_result_through_pipe(test, SKIP)
1692 def symlink_failed(self):
1693 if self.current_test_case_info:
1695 failed_dir = config.failed_dir
1696 link_path = os.path.join(
1698 "%s-FAILED" % os.path.basename(self.current_test_case_info.tempdir),
1701 self.current_test_case_info.logger.debug(
1702 "creating a link to the failed test"
1704 self.current_test_case_info.logger.debug(
1705 "os.symlink(%s, %s)"
1706 % (self.current_test_case_info.tempdir, link_path)
1708 if os.path.exists(link_path):
1709 self.current_test_case_info.logger.debug("symlink already exists")
1711 os.symlink(self.current_test_case_info.tempdir, link_path)
1713 except Exception as e:
1714 self.current_test_case_info.logger.error(e)
1716 def send_result_through_pipe(self, test, result):
1717 if hasattr(self, "test_framework_result_pipe"):
1718 pipe = self.test_framework_result_pipe
1720 pipe.send((test.id(), result))
1722 def log_error(self, test, err, fn_name):
1723 if self.current_test_case_info:
1724 if isinstance(test, unittest.suite._ErrorHolder):
1725 test_name = test.description
1727 test_name = "%s.%s(%s)" % (
1728 test.__class__.__name__,
1729 test._testMethodName,
1730 test._testMethodDoc,
1732 self.current_test_case_info.logger.debug(
1733 "--- %s() %s called, err is %s" % (fn_name, test_name, err)
1735 self.current_test_case_info.logger.debug(
1736 "formatted exception is:\n%s" % "".join(format_exception(*err))
1739 def add_error(self, test, err, unittest_fn, error_type):
1740 if error_type == FAIL:
1741 self.log_error(test, err, "addFailure")
1742 error_type_str = colorize("FAIL", RED)
1743 elif error_type == ERROR:
1744 self.log_error(test, err, "addError")
1745 error_type_str = colorize("ERROR", RED)
1748 "Error type %s cannot be used to record an "
1749 "error or a failure" % error_type
1752 unittest_fn(self, test, err)
1753 if self.current_test_case_info:
1754 self.result_string = "%s [ temp dir used by test case: %s ]" % (
1756 self.current_test_case_info.tempdir,
1758 self.symlink_failed()
1759 self.failed_test_cases_info.add(self.current_test_case_info)
1760 if is_core_present(self.current_test_case_info.tempdir):
1761 if not self.current_test_case_info.core_crash_test:
1762 if isinstance(test, unittest.suite._ErrorHolder):
1763 test_name = str(test)
1765 test_name = "'{!s}' ({!s})".format(
1766 get_testcase_doc_name(test), test.id()
1768 self.current_test_case_info.core_crash_test = test_name
1769 self.core_crash_test_cases_info.add(self.current_test_case_info)
1771 self.result_string = "%s [no temp dir]" % error_type_str
1773 self.send_result_through_pipe(test, error_type)
1775 def addFailure(self, test, err):
1777 Record a test failed result
1780 :param err: error message
1783 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1785 def addError(self, test, err):
1787 Record a test error result
1790 :param err: error message
1793 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1795 def getDescription(self, test):
1797 Get test description
1800 :returns: test description
1803 return get_test_description(self.descriptions, test)
1805 def startTest(self, test):
1813 def print_header(test):
1814 if test.__class__ in self.printed:
1817 test_doc = getdoc(test)
1819 raise Exception("No doc string for test '%s'" % test.id())
1821 test_title = test_doc.splitlines()[0].rstrip()
1822 test_title = colorize(test_title, GREEN)
1823 if test.is_tagged_run_solo():
1824 test_title = colorize(f"SOLO RUN: {test_title}", YELLOW)
1826 # This block may overwrite the colorized title above,
1827 # but we want this to stand out and be fixed
1828 if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
1829 test_title = colorize(f"FIXME with VPP workers: {test_title}", RED)
1831 if test.has_tag(TestCaseTag.FIXME_ASAN):
1832 test_title = colorize(f"FIXME with ASAN: {test_title}", RED)
1833 test.skip_fixme_asan()
1835 if is_distro_ubuntu2204 == True and test.has_tag(
1836 TestCaseTag.FIXME_UBUNTU2204
1838 test_title = colorize(f"FIXME on Ubuntu-22.04: {test_title}", RED)
1839 test.skip_fixme_ubuntu2204()
1841 if is_distro_debian11 == True and test.has_tag(TestCaseTag.FIXME_DEBIAN11):
1842 test_title = colorize(f"FIXME on Debian-11: {test_title}", RED)
1843 test.skip_fixme_debian11()
1845 if hasattr(test, "vpp_worker_count"):
1846 if test.vpp_worker_count == 0:
1847 test_title += " [main thread only]"
1848 elif test.vpp_worker_count == 1:
1849 test_title += " [1 worker thread]"
1851 test_title += f" [{test.vpp_worker_count} worker threads]"
1853 if test.__class__.skipped_due_to_cpu_lack:
1854 test_title = colorize(
1855 f"{test_title} [skipped - not enough cpus, "
1856 f"required={test.__class__.get_cpus_required()}, "
1857 f"available={max_vpp_cpus}]",
1861 print(double_line_delim)
1863 print(double_line_delim)
1864 self.printed.append(test.__class__)
1867 self.start_test = time.time()
1868 unittest.TestResult.startTest(self, test)
1869 if self.verbosity > 0:
1870 self.stream.writeln("Starting " + self.getDescription(test) + " ...")
1871 self.stream.writeln(single_line_delim)
1873 def stopTest(self, test):
1875 Called when the given test has been run
1880 unittest.TestResult.stopTest(self, test)
1882 if self.verbosity > 0:
1883 self.stream.writeln(single_line_delim)
1884 self.stream.writeln(
1885 "%-73s%s" % (self.getDescription(test), self.result_string)
1887 self.stream.writeln(single_line_delim)
1889 self.stream.writeln(
1892 self.getDescription(test),
1893 time.time() - self.start_test,
1898 self.send_result_through_pipe(test, TEST_RUN)
1900 def printErrors(self):
1902 Print errors from running the test case
1904 if len(self.errors) > 0 or len(self.failures) > 0:
1905 self.stream.writeln()
1906 self.printErrorList("ERROR", self.errors)
1907 self.printErrorList("FAIL", self.failures)
1909 # ^^ that is the last output from unittest before summary
1910 if not self.runner.print_summary:
1911 devnull = unittest.runner._WritelnDecorator(open(os.devnull, "w"))
1912 self.stream = devnull
1913 self.runner.stream = devnull
1915 def printErrorList(self, flavour, errors):
1917 Print error list to the output stream together with error type
1918 and test case description.
1920 :param flavour: error type
1921 :param errors: iterable errors
1924 for test, err in errors:
1925 self.stream.writeln(double_line_delim)
1926 self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
1927 self.stream.writeln(single_line_delim)
1928 self.stream.writeln("%s" % err)
1931 class VppTestRunner(unittest.TextTestRunner):
1933 A basic test runner implementation which prints results to standard error.
1937 def resultclass(self):
1938 """Class maintaining the results of the tests"""
1939 return VppTestResult
1943 keep_alive_pipe=None,
1953 # ignore stream setting here, use hard-coded stdout to be in sync
1954 # with prints from VppTestCase methods ...
1955 super(VppTestRunner, self).__init__(
1956 sys.stdout, descriptions, verbosity, failfast, buffer, resultclass, **kwargs
1958 KeepAliveReporter.pipe = keep_alive_pipe
1960 self.orig_stream = self.stream
1961 self.resultclass.test_framework_result_pipe = result_pipe
1963 self.print_summary = print_summary
1965 def _makeResult(self):
1966 return self.resultclass(self.stream, self.descriptions, self.verbosity, self)
1968 def run(self, test):
1975 faulthandler.enable() # emit stack trace to stderr if killed by signal
1977 result = super(VppTestRunner, self).run(test)
1978 if not self.print_summary:
1979 self.stream = self.orig_stream
1980 result.stream = self.orig_stream
1984 class Worker(Thread):
1985 def __init__(self, executable_args, logger, env=None, *args, **kwargs):
1986 super(Worker, self).__init__(*args, **kwargs)
1987 self.logger = logger
1988 self.args = executable_args
1989 if hasattr(self, "testcase") and self.testcase.debug_all:
1990 if self.testcase.debug_gdbserver:
1992 "/usr/bin/gdbserver",
1993 "localhost:{port}".format(port=self.testcase.gdbserver_port),
1995 elif self.testcase.debug_gdb and hasattr(self, "wait_for_gdb"):
1996 self.args.append(self.wait_for_gdb)
1997 self.app_bin = executable_args[0]
1998 self.app_name = os.path.basename(self.app_bin)
1999 if hasattr(self, "role"):
2000 self.app_name += " {role}".format(role=self.role)
2003 env = {} if env is None else env
2004 self.env = copy.deepcopy(env)
2006 def wait_for_enter(self):
2007 if not hasattr(self, "testcase"):
2009 if self.testcase.debug_all and self.testcase.debug_gdbserver:
2011 print(double_line_delim)
2013 "Spawned GDB Server for '{app}' with PID: {pid}".format(
2014 app=self.app_name, pid=self.process.pid
2017 elif self.testcase.debug_all and self.testcase.debug_gdb:
2019 print(double_line_delim)
2021 "Spawned '{app}' with PID: {pid}".format(
2022 app=self.app_name, pid=self.process.pid
2027 print(single_line_delim)
2028 print("You can debug '{app}' using:".format(app=self.app_name))
2029 if self.testcase.debug_gdbserver:
2033 + " -ex 'target remote localhost:{port}'".format(
2034 port=self.testcase.gdbserver_port
2038 "Now is the time to attach gdb by running the above "
2039 "command, set up breakpoints etc., then resume from "
2040 "within gdb by issuing the 'continue' command"
2042 self.testcase.gdbserver_port += 1
2043 elif self.testcase.debug_gdb:
2047 + " -ex 'attach {pid}'".format(pid=self.process.pid)
2050 "Now is the time to attach gdb by running the above "
2051 "command and set up breakpoints etc., then resume from"
2052 " within gdb by issuing the 'continue' command"
2054 print(single_line_delim)
2055 input("Press ENTER to continue running the testcase...")
2058 executable = self.args[0]
2059 if not os.path.exists(executable) or not os.access(
2060 executable, os.F_OK | os.X_OK
2062 # Exit code that means some system file did not exist,
2063 # could not be opened, or had some other kind of error.
2064 self.result = os.EX_OSFILE
2065 raise EnvironmentError(
2066 "executable '%s' is not found or executable." % executable
2069 "Running executable '{app}': '{cmd}'".format(
2070 app=self.app_name, cmd=" ".join(self.args)
2073 env = os.environ.copy()
2074 env.update(self.env)
2075 env["CK_LOG_FILE_NAME"] = "-"
2076 self.process = subprocess.Popen(
2077 ["stdbuf", "-o0", "-e0"] + self.args,
2080 preexec_fn=os.setpgrp,
2081 stdout=subprocess.PIPE,
2082 stderr=subprocess.PIPE,
2084 self.wait_for_enter()
2085 out, err = self.process.communicate()
2086 self.logger.debug("Finished running `{app}'".format(app=self.app_name))
2087 self.logger.info("Return code is `%s'" % self.process.returncode)
2088 self.logger.info(single_line_delim)
2090 "Executable `{app}' wrote to stdout:".format(app=self.app_name)
2092 self.logger.info(single_line_delim)
2093 self.logger.info(out.decode("utf-8"))
2094 self.logger.info(single_line_delim)
2096 "Executable `{app}' wrote to stderr:".format(app=self.app_name)
2098 self.logger.info(single_line_delim)
2099 self.logger.info(err.decode("utf-8"))
2100 self.logger.info(single_line_delim)
2101 self.result = self.process.returncode
2104 if __name__ == "__main__":