3 from __future__ import print_function
18 from collections import deque
19 from threading import Thread, Event
20 from inspect import getdoc, isclass
21 from traceback import format_exception
22 from logging import FileHandler, DEBUG, Formatter
24 from abc import ABC, abstractmethod
25 from struct import pack, unpack
28 from scapy.packet import Raw, Packet
29 from config import config, available_cpus, num_cpus, max_vpp_cpus
30 import hook as hookmodule
31 from vpp_pg_interface import VppPGInterface
32 from vpp_sub_interface import VppSubInterface
33 from vpp_lo_interface import VppLoInterface
34 from vpp_bvi_interface import VppBviInterface
35 from vpp_papi_provider import VppPapiProvider
36 from vpp_papi import VppEnum
38 from vpp_papi.vpp_stats import VPPStats
39 from vpp_papi.vpp_transport_socket import VppTransportSocketIOError
49 from vpp_object import VppObjectRegistry
50 from util import ppp, is_core_present
51 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
52 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
53 from scapy.layers.inet6 import ICMPv6EchoReply
54 from vpp_running import use_running
57 logger = logging.getLogger(__name__)
59 # Set up an empty logger for the testcase that can be overridden as necessary
60 null_logger = logging.getLogger("VppTestCase")
61 null_logger.addHandler(logging.NullHandler())
71 if config.debug_framework:
75 Test framework module.
77 The module provides a set of tools for constructing and running tests and
78 representing the results.
82 class VppDiedError(Exception):
83 """exception for reporting that the subprocess has died."""
87 for k, v in signal.__dict__.items()
88 if k.startswith("SIG") and not k.startswith("SIG_")
91 def __init__(self, rv=None, testcase=None, method_name=None):
93 self.signal_name = None
94 self.testcase = testcase
95 self.method_name = method_name
98 self.signal_name = VppDiedError.signals_by_value[-rv]
99 except (KeyError, TypeError):
102 if testcase is None and method_name is None:
105 in_msg = " while running %s.%s" % (testcase, method_name)
108 msg = "VPP subprocess died unexpectedly%s with return code: %d%s." % (
111 " [%s]" % (self.signal_name if self.signal_name is not None else ""),
114 msg = "VPP subprocess died unexpectedly%s." % in_msg
116 super(VppDiedError, self).__init__(msg)
119 class _PacketInfo(object):
120 """Private class to create packet info object.
122 Help process information about the next packet.
123 Set variables to default values.
126 #: Store the index of the packet.
128 #: Store the index of the source packet generator interface of the packet.
130 #: Store the index of the destination packet generator interface
133 #: Store expected ip version
135 #: Store expected upper protocol
137 #: Store the copy of the former packet.
140 def __eq__(self, other):
141 index = self.index == other.index
142 src = self.src == other.src
143 dst = self.dst == other.dst
144 data = self.data == other.data
145 return index and src and dst and data
148 def pump_output(testclass):
149 """pump output from vpp stdout/stderr to proper queues"""
150 if not hasattr(testclass, "vpp"):
154 while not testclass.pump_thread_stop_flag.is_set():
155 readable = select.select(
157 testclass.vpp.stdout.fileno(),
158 testclass.vpp.stderr.fileno(),
159 testclass.pump_thread_wakeup_pipe[0],
164 if testclass.vpp.stdout.fileno() in readable:
165 read = os.read(testclass.vpp.stdout.fileno(), 102400)
167 split = read.decode("ascii", errors="backslashreplace").splitlines(True)
168 if len(stdout_fragment) > 0:
169 split[0] = "%s%s" % (stdout_fragment, split[0])
170 if len(split) > 0 and split[-1].endswith("\n"):
174 stdout_fragment = split[-1]
175 testclass.vpp_stdout_deque.extend(split[:limit])
176 if not config.cache_vpp_output:
177 for line in split[:limit]:
178 testclass.logger.info("VPP STDOUT: %s" % line.rstrip("\n"))
179 if testclass.vpp.stderr.fileno() in readable:
180 read = os.read(testclass.vpp.stderr.fileno(), 102400)
182 split = read.decode("ascii", errors="backslashreplace").splitlines(True)
183 if len(stderr_fragment) > 0:
184 split[0] = "%s%s" % (stderr_fragment, split[0])
185 if len(split) > 0 and split[-1].endswith("\n"):
189 stderr_fragment = split[-1]
191 testclass.vpp_stderr_deque.extend(split[:limit])
192 if not config.cache_vpp_output:
193 for line in split[:limit]:
194 testclass.logger.error("VPP STDERR: %s" % line.rstrip("\n"))
195 # ignoring the dummy pipe here intentionally - the
196 # flag will take care of properly terminating the loop
199 def _is_platform_aarch64():
200 return platform.machine() == "aarch64"
203 is_platform_aarch64 = _is_platform_aarch64()
206 def _is_distro_ubuntu2204():
207 with open("/etc/os-release") as f:
208 for line in f.readlines():
214 is_distro_ubuntu2204 = _is_distro_ubuntu2204()
217 def _is_distro_debian11():
218 with open("/etc/os-release") as f:
219 for line in f.readlines():
220 if "bullseye" in line:
225 is_distro_debian11 = _is_distro_debian11()
228 class KeepAliveReporter(object):
230 Singleton object which reports test start to parent process
236 self.__dict__ = self._shared_state
244 def pipe(self, pipe):
245 if self._pipe is not None:
246 raise Exception("Internal error - pipe should only be set once.")
249 def send_keep_alive(self, test, desc=None):
251 Write current test tmpdir & desc to keep-alive pipe to signal liveness
253 if not hasattr(test, "vpp") or self.pipe is None:
254 # if not running forked..
258 desc = "%s (%s)" % (desc, unittest.util.strclass(test))
262 self.pipe.send((desc, config.vpp, test.tempdir, test.vpp.pid))
265 class TestCaseTag(Enum):
266 # marks the suites that must run at the end
267 # using only a single test runner
269 # marks the suites broken on VPP multi-worker
270 FIXME_VPP_WORKERS = 2
271 # marks the suites broken when ASan is enabled
273 # marks suites broken on Ubuntu-22.04
275 # marks suites broken on Debian-11
277 # marks suites broken on debug vpp image
281 def create_tag_decorator(e):
284 cls.test_tags.append(e)
285 except AttributeError:
292 tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
293 tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
294 tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN)
295 tag_fixme_ubuntu2204 = create_tag_decorator(TestCaseTag.FIXME_UBUNTU2204)
296 tag_fixme_debian11 = create_tag_decorator(TestCaseTag.FIXME_DEBIAN11)
297 tag_fixme_vpp_debug = create_tag_decorator(TestCaseTag.FIXME_VPP_DEBUG)
311 class CPUInterface(ABC):
313 skipped_due_to_cpu_lack = False
317 def get_cpus_required(cls):
321 def assign_cpus(cls, cpus):
326 class VppTestCase(CPUInterface, unittest.TestCase):
327 """This subclass is a base class for VPP test cases that are implemented as
328 classes. It provides methods to create and run test case.
331 extra_vpp_statseg_config = ""
332 extra_vpp_punt_config = []
333 extra_vpp_plugin_config = []
335 vapi_response_timeout = 5
336 remove_configured_vpp_objects_on_tear_down = True
339 def packet_infos(self):
340 """List of packet infos"""
341 return self._packet_infos
344 def get_packet_count_for_if_idx(cls, dst_if_index):
345 """Get the number of packet info for specified destination if index"""
346 if dst_if_index in cls._packet_count_for_dst_if_idx:
347 return cls._packet_count_for_dst_if_idx[dst_if_index]
352 def has_tag(cls, tag):
353 """if the test case has a given tag - return true"""
355 return tag in cls.test_tags
356 except AttributeError:
361 def is_tagged_run_solo(cls):
362 """if the test case class is timing-sensitive - return true"""
363 return cls.has_tag(TestCaseTag.RUN_SOLO)
366 def skip_fixme_asan(cls):
367 """if @tag_fixme_asan & ASan is enabled - mark for skip"""
368 if cls.has_tag(TestCaseTag.FIXME_ASAN):
369 vpp_extra_cmake_args = os.environ.get("VPP_EXTRA_CMAKE_ARGS", "")
370 if "DVPP_ENABLE_SANITIZE_ADDR=ON" in vpp_extra_cmake_args:
371 cls = unittest.skip("Skipping @tag_fixme_asan tests")(cls)
374 def skip_fixme_ubuntu2204(cls):
375 """if distro is ubuntu 22.04 and @tag_fixme_ubuntu2204 mark for skip"""
376 if cls.has_tag(TestCaseTag.FIXME_UBUNTU2204):
377 cls = unittest.skip("Skipping @tag_fixme_ubuntu2204 tests")(cls)
380 def skip_fixme_debian11(cls):
381 """if distro is Debian-11 and @tag_fixme_debian11 mark for skip"""
382 if cls.has_tag(TestCaseTag.FIXME_DEBIAN11):
383 cls = unittest.skip("Skipping @tag_fixme_debian11 tests")(cls)
386 def skip_fixme_vpp_debug(cls):
387 cls = unittest.skip("Skipping @tag_fixme_vpp_debug tests")(cls)
391 """Return the instance of this testcase"""
392 return cls.test_instance
395 def set_debug_flags(cls, d):
396 cls.gdbserver_port = 7777
397 cls.debug_core = False
398 cls.debug_gdb = False
399 cls.debug_gdbserver = False
400 cls.debug_all = False
401 cls.debug_attach = False
406 cls.debug_core = True
407 elif dl == "gdb" or dl == "gdb-all":
409 elif dl == "gdbserver" or dl == "gdbserver-all":
410 cls.debug_gdbserver = True
412 cls.debug_attach = True
414 raise Exception("Unrecognized DEBUG option: '%s'" % d)
415 if dl == "gdb-all" or dl == "gdbserver-all":
419 def get_vpp_worker_count(cls):
420 if not hasattr(cls, "vpp_worker_count"):
421 if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
422 cls.vpp_worker_count = 0
424 cls.vpp_worker_count = config.vpp_worker_count
425 return cls.vpp_worker_count
428 def get_cpus_required(cls):
429 return 1 + cls.get_vpp_worker_count()
432 def setUpConstants(cls):
433 """Set-up the test case class based on environment variables"""
434 cls.step = config.step
435 cls.plugin_path = ":".join(config.vpp_plugin_dir)
436 cls.test_plugin_path = ":".join(config.vpp_test_plugin_dir)
437 cls.extern_plugin_path = ":".join(config.extern_plugin_dir)
439 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
440 debug_cli = "cli-listen localhost:5002"
441 size = re.search(r"\d+[gG]", config.coredump_size)
443 coredump_size = f"coredump-size {config.coredump_size}".lower()
445 coredump_size = "coredump-size unlimited"
446 default_variant = config.variant
447 if default_variant is not None:
448 default_variant = "default { variant %s 100 }" % default_variant
452 api_fuzzing = config.api_fuzz
453 if api_fuzzing is None:
474 cls.get_api_segment_prefix(),
481 if cls.extern_plugin_path not in (None, ""):
482 cls.extra_vpp_plugin_config.append("add-path %s" % cls.extern_plugin_path)
483 if cls.get_vpp_worker_count():
484 cls.vpp_cmdline.extend(
485 ["corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])]
487 cls.vpp_cmdline.extend(
498 cls.get_stats_sock_path(),
499 cls.extra_vpp_statseg_config,
504 cls.get_api_sock_path(),
525 "lisp_unittest_plugin.so",
530 "unittest_plugin.so",
535 + cls.extra_vpp_plugin_config
541 if cls.extra_vpp_punt_config is not None:
542 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
544 if not cls.debug_attach:
545 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
546 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
549 def wait_for_enter(cls):
550 if cls.debug_gdbserver:
551 print(double_line_delim)
552 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
554 print(double_line_delim)
555 print("Spawned VPP with PID: %d" % cls.vpp.pid)
557 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
559 print(single_line_delim)
560 print("You can debug VPP using:")
561 if cls.debug_gdbserver:
563 f"sudo gdb {config.vpp} "
564 f"-ex 'target remote localhost:{cls.gdbserver_port}'"
567 "Now is the time to attach gdb by running the above "
568 "command, set up breakpoints etc., then resume VPP from "
569 "within gdb by issuing the 'continue' command"
571 cls.gdbserver_port += 1
573 print(f"sudo gdb {config.vpp} -ex 'attach {cls.vpp.pid}'")
575 "Now is the time to attach gdb by running the above "
576 "command and set up breakpoints etc., then resume VPP from"
577 " within gdb by issuing the 'continue' command"
579 print(single_line_delim)
580 input("Press ENTER to continue running the testcase...")
589 is_distro_ubuntu2204 == True and cls.has_tag(TestCaseTag.FIXME_UBUNTU2204)
590 ) or (is_distro_debian11 == True and cls.has_tag(TestCaseTag.FIXME_DEBIAN11)):
592 cls.logger.debug(f"Assigned cpus: {cls.cpus}")
593 cmdline = cls.vpp_cmdline
595 if cls.debug_gdbserver:
596 gdbserver = "/usr/bin/gdbserver"
597 if not os.path.isfile(gdbserver) or not os.access(gdbserver, os.X_OK):
599 "gdbserver binary '%s' does not exist or is "
600 "not executable" % gdbserver
605 "localhost:{port}".format(port=cls.gdbserver_port),
607 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
610 cls.vpp = subprocess.Popen(
611 cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE
613 except subprocess.CalledProcessError as e:
615 "Subprocess returned with non-0 return code: (%s)", e.returncode
620 "Subprocess returned with OS error: (%s) %s", e.errno, e.strerror
623 except Exception as e:
624 cls.logger.exception("Subprocess returned unexpected from %s:", cmdline)
630 def wait_for_coredump(cls):
631 corefile = cls.tempdir + "/core"
632 if os.path.isfile(corefile):
633 cls.logger.error("Waiting for coredump to complete: %s", corefile)
634 curr_size = os.path.getsize(corefile)
635 deadline = time.time() + 60
637 while time.time() < deadline:
640 curr_size = os.path.getsize(corefile)
641 if size == curr_size:
646 "Timed out waiting for coredump to complete: %s", corefile
649 cls.logger.error("Coredump complete: %s, size %d", corefile, curr_size)
652 def get_stats_sock_path(cls):
653 return "%s/stats.sock" % cls.tempdir
656 def get_api_sock_path(cls):
657 return "%s/api.sock" % cls.tempdir
660 def get_api_segment_prefix(cls):
661 return os.path.basename(cls.tempdir) # Only used for VAPI
664 def get_tempdir(cls):
666 tmpdir = f"{config.tmp_dir}/unittest-attach-gdb"
668 tmpdir = f"{config.tmp_dir}/vpp-unittest-{cls.__name__}"
669 if config.wipe_tmp_dir:
670 shutil.rmtree(tmpdir, ignore_errors=True)
675 def create_file_handler(cls):
676 if config.log_dir is None:
677 cls.file_handler = FileHandler(f"{cls.tempdir}/log.txt")
680 logdir = f"{config.log_dir}/vpp-unittest-{cls.__name__}"
681 if config.wipe_tmp_dir:
682 shutil.rmtree(logdir, ignore_errors=True)
684 cls.file_handler = FileHandler(f"{logdir}/log.txt")
689 Perform class setup before running the testcase
690 Remove shared memory files, start vpp and connect the vpp-api
692 super(VppTestCase, cls).setUpClass()
693 cls.logger = get_logger(cls.__name__)
694 random.seed(config.rnd_seed)
695 if hasattr(cls, "parallel_handler"):
696 cls.logger.addHandler(cls.parallel_handler)
697 cls.logger.propagate = False
698 cls.set_debug_flags(config.debug)
699 cls.tempdir = cls.get_tempdir()
700 cls.create_file_handler()
701 cls.file_handler.setFormatter(
702 Formatter(fmt="%(asctime)s,%(msecs)03d %(message)s", datefmt="%H:%M:%S")
704 cls.file_handler.setLevel(DEBUG)
705 cls.logger.addHandler(cls.file_handler)
706 cls.logger.debug("--- setUpClass() for %s called ---" % cls.__name__)
707 os.chdir(cls.tempdir)
709 "Temporary dir is %s, api socket is %s",
711 cls.get_api_sock_path(),
713 cls.logger.debug("Random seed is %s", config.rnd_seed)
715 cls.reset_packet_infos()
720 cls.registry = VppObjectRegistry()
721 cls.vpp_startup_failed = False
722 cls.reporter = KeepAliveReporter()
723 # need to catch exceptions here because if we raise, then the cleanup
724 # doesn't get called and we might end with a zombie vpp
730 if not hasattr(cls, "vpp"):
732 cls.reporter.send_keep_alive(cls, "setUpClass")
733 VppTestResult.current_test_case_info = TestCaseInfo(
734 cls.logger, cls.tempdir, cls.vpp.pid, config.vpp
736 cls.vpp_stdout_deque = deque()
737 cls.vpp_stderr_deque = deque()
738 # Pump thread in a non-debug-attached & not running-vpp
739 if not cls.debug_attach and not hasattr(cls, "running_vpp"):
740 cls.pump_thread_stop_flag = Event()
741 cls.pump_thread_wakeup_pipe = os.pipe()
742 cls.pump_thread = Thread(target=pump_output, args=(cls,))
743 cls.pump_thread.daemon = True
744 cls.pump_thread.start()
745 if cls.debug_gdb or cls.debug_gdbserver or cls.debug_attach:
746 cls.vapi_response_timeout = 0
747 cls.vapi = VppPapiProvider(cls.__name__, cls, cls.vapi_response_timeout)
749 hook = hookmodule.StepHook(cls)
751 hook = hookmodule.PollHook(cls)
752 cls.vapi.register_hook(hook)
753 cls.statistics = VPPStats(socketname=cls.get_stats_sock_path())
757 cls.vpp_startup_failed = True
759 "VPP died shortly after startup, check the"
760 " output to standard error for possible cause"
765 except (vpp_papi.VPPIOError, Exception) as e:
766 cls.logger.debug("Exception connecting to vapi: %s" % e)
767 cls.vapi.disconnect()
769 if cls.debug_gdbserver:
772 "You're running VPP inside gdbserver but "
773 "VPP-API connection failed, did you forget "
774 "to 'continue' VPP from within gdb?",
780 last_line = cls.vapi.cli("show thread").split("\n")[-2]
781 cls.vpp_worker_count = int(last_line.split(" ")[0])
782 print("Detected VPP with %s workers." % cls.vpp_worker_count)
783 except vpp_papi.VPPRuntimeError as e:
784 cls.logger.debug("%s" % e)
787 except Exception as e:
788 cls.logger.debug("Exception connecting to VPP: %s" % e)
793 def _debug_quit(cls):
794 if cls.debug_gdbserver or cls.debug_gdb:
798 if cls.vpp.returncode is None:
800 print(double_line_delim)
801 print("VPP or GDB server is still running")
802 print(single_line_delim)
804 "When done debugging, press ENTER to kill the "
805 "process and finish running the testcase..."
807 except AttributeError:
813 Disconnect vpp-api, kill vpp and cleanup shared memory files
816 if hasattr(cls, "running_vpp"):
819 # first signal that we want to stop the pump thread, then wake it up
820 if hasattr(cls, "pump_thread_stop_flag"):
821 cls.pump_thread_stop_flag.set()
822 if hasattr(cls, "pump_thread_wakeup_pipe"):
823 os.write(cls.pump_thread_wakeup_pipe[1], b"ding dong wake up")
824 if hasattr(cls, "pump_thread"):
825 cls.logger.debug("Waiting for pump thread to stop")
826 cls.pump_thread.join()
827 if hasattr(cls, "vpp_stderr_reader_thread"):
828 cls.logger.debug("Waiting for stderr pump to stop")
829 cls.vpp_stderr_reader_thread.join()
831 if hasattr(cls, "vpp"):
832 if hasattr(cls, "vapi"):
833 cls.logger.debug(cls.vapi.vpp.get_stats())
834 cls.logger.debug("Disconnecting class vapi client on %s", cls.__name__)
835 cls.vapi.disconnect()
836 cls.logger.debug("Deleting class vapi attribute on %s", cls.__name__)
839 if not cls.debug_attach and cls.vpp.returncode is None:
840 cls.wait_for_coredump()
841 cls.logger.debug("Sending TERM to vpp")
843 cls.logger.debug("Waiting for vpp to die")
845 outs, errs = cls.vpp.communicate(timeout=5)
846 except subprocess.TimeoutExpired:
848 outs, errs = cls.vpp.communicate()
849 cls.logger.debug("Deleting class vpp attribute on %s", cls.__name__)
850 if not cls.debug_attach and not hasattr(cls, "running_vpp"):
851 cls.vpp.stdout.close()
852 cls.vpp.stderr.close()
853 # If vpp is a dynamic attribute set by the func use_running,
854 # deletion will result in an AttributeError that we can
858 except AttributeError:
861 if cls.vpp_startup_failed:
862 stdout_log = cls.logger.info
863 stderr_log = cls.logger.critical
865 stdout_log = cls.logger.info
866 stderr_log = cls.logger.info
868 if hasattr(cls, "vpp_stdout_deque"):
869 stdout_log(single_line_delim)
870 stdout_log("VPP output to stdout while running %s:", cls.__name__)
871 stdout_log(single_line_delim)
872 vpp_output = "".join(cls.vpp_stdout_deque)
873 with open(cls.tempdir + "/vpp_stdout.txt", "w") as f:
875 stdout_log("\n%s", vpp_output)
876 stdout_log(single_line_delim)
878 if hasattr(cls, "vpp_stderr_deque"):
879 stderr_log(single_line_delim)
880 stderr_log("VPP output to stderr while running %s:", cls.__name__)
881 stderr_log(single_line_delim)
882 vpp_output = "".join(cls.vpp_stderr_deque)
883 with open(cls.tempdir + "/vpp_stderr.txt", "w") as f:
885 stderr_log("\n%s", vpp_output)
886 stderr_log(single_line_delim)
889 def tearDownClass(cls):
890 """Perform final cleanup after running all tests in this test-case"""
891 cls.logger.debug("--- tearDownClass() for %s called ---" % cls.__name__)
892 if not hasattr(cls, "vpp"):
894 cls.reporter.send_keep_alive(cls, "tearDownClass")
896 cls.file_handler.close()
897 cls.reset_packet_infos()
898 if config.debug_framework:
899 debug_internal.on_tear_down_class(cls)
901 def show_commands_at_teardown(self):
902 """Allow subclass specific teardown logging additions."""
903 self.logger.info("--- No test specific show commands provided. ---")
906 """Show various debug prints after each test"""
908 "--- tearDown() for %s.%s(%s) called ---"
909 % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
911 if not hasattr(self, "vpp"):
915 if not self.vpp_dead:
916 self.logger.debug(self.vapi.cli("show trace max 1000"))
917 self.logger.info(self.vapi.ppcli("show interface"))
918 self.logger.info(self.vapi.ppcli("show hardware"))
919 self.logger.info(self.statistics.set_errors_str())
920 self.logger.info(self.vapi.ppcli("show run"))
921 self.logger.info(self.vapi.ppcli("show log"))
922 self.logger.info(self.vapi.ppcli("show bihash"))
923 self.logger.info("Logging testcase specific show commands.")
924 self.show_commands_at_teardown()
925 if self.remove_configured_vpp_objects_on_tear_down:
926 self.registry.remove_vpp_config(self.logger)
927 # Save/Dump VPP api trace log
928 m = self._testMethodName
929 api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
930 tmp_api_trace = "/tmp/%s" % api_trace
931 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
932 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
933 self.logger.info("Moving %s to %s\n" % (tmp_api_trace, vpp_api_trace_log))
934 os.rename(tmp_api_trace, vpp_api_trace_log)
935 except VppTransportSocketIOError:
937 "VppTransportSocketIOError: Vpp dead. Cannot log show commands."
941 self.registry.unregister_all(self.logger)
944 """Clear trace before running each test"""
945 super(VppTestCase, self).setUp()
946 if not hasattr(self, "vpp"):
948 self.reporter.send_keep_alive(self)
952 testcase=self.__class__.__name__,
953 method_name=self._testMethodName,
955 self.sleep(0.1, "during setUp")
956 self.vpp_stdout_deque.append(
957 "--- test setUp() for %s.%s(%s) starts here ---\n"
958 % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
960 self.vpp_stderr_deque.append(
961 "--- test setUp() for %s.%s(%s) starts here ---\n"
962 % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
964 self.vapi.cli("clear trace")
965 # store the test instance inside the test class - so that objects
966 # holding the class can access instance methods (like assertEqual)
967 type(self).test_instance = self
970 def pg_enable_capture(cls, interfaces=None):
972 Enable capture on packet-generator interfaces
974 :param interfaces: iterable interface indexes (if None,
975 use self.pg_interfaces)
978 if interfaces is None:
979 interfaces = cls.pg_interfaces
984 def register_pcap(cls, intf, worker):
985 """Register a pcap in the testclass"""
986 # add to the list of captures with current timestamp
987 cls._pcaps.append((intf, worker))
990 def get_vpp_time(cls):
991 # processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT"
992 # returns float("2.190522")
993 timestr = cls.vapi.cli("show clock")
994 head, sep, tail = timestr.partition(",")
995 head, sep, tail = head.partition("Time now")
999 def sleep_on_vpp_time(cls, sec):
1000 """Sleep according to time in VPP world"""
1001 # On a busy system with many processes
1002 # we might end up with VPP time being slower than real world
1003 # So take that into account when waiting for VPP to do something
1004 start_time = cls.get_vpp_time()
1005 while cls.get_vpp_time() - start_time < sec:
1009 def pg_start(cls, trace=True):
1010 """Enable the PG, wait till it is done, then clean up"""
1011 for (intf, worker) in cls._old_pcaps:
1012 intf.handle_old_pcap_file(intf.get_in_path(worker), intf.in_history_counter)
1015 cls.vapi.cli("clear trace")
1016 cls.vapi.cli("trace add pg-input 1000")
1017 cls.vapi.cli("packet-generator enable")
1018 # PG, when starts, runs to completion -
1019 # so let's avoid a race condition,
1020 # and wait a little till it's done.
1021 # Then clean it up - and then be gone.
1022 deadline = time.time() + 300
1023 while cls.vapi.cli("show packet-generator").find("Yes") != -1:
1024 cls.sleep(0.01) # yield
1025 if time.time() > deadline:
1026 cls.logger.error("Timeout waiting for pg to stop")
1028 for intf, worker in cls._pcaps:
1029 cls.vapi.cli("packet-generator delete %s" % intf.get_cap_name(worker))
1030 cls._old_pcaps = cls._pcaps
1034 def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0, mode=None):
1036 Create packet-generator interfaces.
1038 :param interfaces: iterable indexes of the interfaces.
1039 :returns: List of created interfaces.
1043 for i in interfaces:
1044 intf = VppPGInterface(cls, i, gso, gso_size, mode)
1045 setattr(cls, intf.name, intf)
1047 cls.pg_interfaces = result
1051 def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
1052 if not hasattr(cls, "vpp"):
1053 cls.pg_interfaces = []
1054 return cls.pg_interfaces
1055 pgmode = VppEnum.vl_api_pg_interface_mode_t
1056 return cls.create_pg_interfaces_internal(
1057 interfaces, gso, gso_size, pgmode.PG_API_MODE_IP4
1061 def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
1062 if not hasattr(cls, "vpp"):
1063 cls.pg_interfaces = []
1064 return cls.pg_interfaces
1065 pgmode = VppEnum.vl_api_pg_interface_mode_t
1066 return cls.create_pg_interfaces_internal(
1067 interfaces, gso, gso_size, pgmode.PG_API_MODE_IP6
1071 def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
1072 if not hasattr(cls, "vpp"):
1073 cls.pg_interfaces = []
1074 return cls.pg_interfaces
1075 pgmode = VppEnum.vl_api_pg_interface_mode_t
1076 return cls.create_pg_interfaces_internal(
1077 interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
1081 def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
1082 if not hasattr(cls, "vpp"):
1083 cls.pg_interfaces = []
1084 return cls.pg_interfaces
1085 pgmode = VppEnum.vl_api_pg_interface_mode_t
1086 return cls.create_pg_interfaces_internal(
1087 interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
1091 def create_loopback_interfaces(cls, count):
1093 Create loopback interfaces.
1095 :param count: number of interfaces created.
1096 :returns: List of created interfaces.
1098 if not hasattr(cls, "vpp"):
1099 cls.lo_interfaces = []
1100 return cls.lo_interfaces
1101 result = [VppLoInterface(cls) for i in range(count)]
1103 setattr(cls, intf.name, intf)
1104 cls.lo_interfaces = result
1108 def create_bvi_interfaces(cls, count):
1110 Create BVI interfaces.
1112 :param count: number of interfaces created.
1113 :returns: List of created interfaces.
1115 if not hasattr(cls, "vpp"):
1116 cls.bvi_interfaces = []
1117 return cls.bvi_interfaces
1118 result = [VppBviInterface(cls) for i in range(count)]
1120 setattr(cls, intf.name, intf)
1121 cls.bvi_interfaces = result
1125 def extend_packet(packet, size, padding=" "):
1127 Extend packet to given size by padding with spaces or custom padding
1128 NOTE: Currently works only when Raw layer is present.
1130 :param packet: packet
1131 :param size: target size
1132 :param padding: padding used to extend the payload
1135 packet_len = len(packet) + 4
1136 extend = size - packet_len
1138 num = (extend // len(padding)) + 1
1139 packet[Raw].load += (padding * num)[:extend].encode("ascii")
1142 def reset_packet_infos(cls):
1143 """Reset the list of packet info objects and packet counts to zero"""
1144 cls._packet_infos = {}
1145 cls._packet_count_for_dst_if_idx = {}
1148 def create_packet_info(cls, src_if, dst_if):
1150 Create packet info object containing the source and destination indexes
1151 and add it to the testcase's packet info list
1153 :param VppInterface src_if: source interface
1154 :param VppInterface dst_if: destination interface
1156 :returns: _PacketInfo object
1159 info = _PacketInfo()
1160 info.index = len(cls._packet_infos)
1161 info.src = src_if.sw_if_index
1162 info.dst = dst_if.sw_if_index
1163 if isinstance(dst_if, VppSubInterface):
1164 dst_idx = dst_if.parent.sw_if_index
1166 dst_idx = dst_if.sw_if_index
1167 if dst_idx in cls._packet_count_for_dst_if_idx:
1168 cls._packet_count_for_dst_if_idx[dst_idx] += 1
1170 cls._packet_count_for_dst_if_idx[dst_idx] = 1
1171 cls._packet_infos[info.index] = info
1175 def info_to_payload(info):
1177 Convert _PacketInfo object to packet payload
1179 :param info: _PacketInfo object
1181 :returns: string containing serialized data from packet info
1184 # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1185 return pack("iiiih", info.index, info.src, info.dst, info.ip, info.proto)
1188 def payload_to_info(payload, payload_field="load"):
1190 Convert packet payload to _PacketInfo object
1192 :param payload: packet payload
1193 :type payload: <class 'scapy.packet.Raw'>
1194 :param payload_field: packet fieldname of payload "load" for
1195 <class 'scapy.packet.Raw'>
1196 :type payload_field: str
1197 :returns: _PacketInfo object containing de-serialized data from payload
1201 # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1202 payload_b = getattr(payload, payload_field)[:18]
1204 info = _PacketInfo()
1205 info.index, info.src, info.dst, info.ip, info.proto = unpack("iiiih", payload_b)
1207 # some SRv6 TCs depend on get an exception if bad values are detected
1208 if info.index > 0x4000:
1209 raise ValueError("Index value is invalid")
1213 def get_next_packet_info(self, info):
1215 Iterate over the packet info list stored in the testcase
1216 Start iteration with first element if info is None
1217 Continue based on index in info if info is specified
1219 :param info: info or None
1220 :returns: next info in list or None if no more infos
1225 next_index = info.index + 1
1226 if next_index == len(self._packet_infos):
1229 return self._packet_infos[next_index]
1231 def get_next_packet_info_for_interface(self, src_index, info):
1233 Search the packet info list for the next packet info with same source
1236 :param src_index: source interface index to search for
1237 :param info: packet info - where to start the search
1238 :returns: packet info or None
1242 info = self.get_next_packet_info(info)
1245 if info.src == src_index:
1248 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
1250 Search the packet info list for the next packet info with same source
1251 and destination interface indexes
1253 :param src_index: source interface index to search for
1254 :param dst_index: destination interface index to search for
1255 :param info: packet info - where to start the search
1256 :returns: packet info or None
1260 info = self.get_next_packet_info_for_interface(src_index, info)
1263 if info.dst == dst_index:
1266 def assert_equal(self, real_value, expected_value, name_or_class=None):
1267 if name_or_class is None:
1268 self.assertEqual(real_value, expected_value)
1271 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
1273 getdoc(name_or_class).strip(),
1275 str(name_or_class(real_value)),
1277 str(name_or_class(expected_value)),
1280 msg = "Invalid %s: %s does not match expected value %s" % (
1286 self.assertEqual(real_value, expected_value, msg)
1288 def assert_in_range(self, real_value, expected_min, expected_max, name=None):
1292 msg = "Invalid %s: %s out of range <%s,%s>" % (
1298 self.assertTrue(expected_min <= real_value <= expected_max, msg)
1300 def assert_packet_checksums_valid(self, packet, ignore_zero_udp_checksums=True):
1301 received = packet.__class__(scapy.compat.raw(packet))
1302 udp_layers = ["UDP", "UDPerror"]
1303 checksum_fields = ["cksum", "chksum"]
1306 temp = received.__class__(scapy.compat.raw(received))
1308 layer = temp.getlayer(counter)
1310 layer = layer.copy()
1311 layer.remove_payload()
1312 for cf in checksum_fields:
1313 if hasattr(layer, cf):
1315 ignore_zero_udp_checksums
1316 and 0 == getattr(layer, cf)
1317 and layer.name in udp_layers
1320 delattr(temp.getlayer(counter), cf)
1321 checksums.append((counter, cf))
1324 counter = counter + 1
1325 if 0 == len(checksums):
1327 temp = temp.__class__(scapy.compat.raw(temp))
1328 for layer, cf in checksums:
1329 calc_sum = getattr(temp[layer], cf)
1331 getattr(received[layer], cf),
1333 "packet checksum on layer #%d: %s" % (layer, temp[layer].name),
1336 "Checksum field `%s` on `%s` layer has correct value `%s`"
1337 % (cf, temp[layer].name, calc_sum)
1340 def assert_checksum_valid(
1341 self, received_packet, layer, field_name="chksum", ignore_zero_checksum=False
1343 """Check checksum of received packet on given layer"""
1344 received_packet_checksum = getattr(received_packet[layer], field_name)
1345 if ignore_zero_checksum and 0 == received_packet_checksum:
1347 recalculated = received_packet.__class__(scapy.compat.raw(received_packet))
1348 delattr(recalculated[layer], field_name)
1349 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1351 received_packet_checksum,
1352 getattr(recalculated[layer], field_name),
1353 "packet checksum on layer: %s" % layer,
1356 def assert_ip_checksum_valid(self, received_packet, ignore_zero_checksum=False):
1357 self.assert_checksum_valid(
1358 received_packet, "IP", ignore_zero_checksum=ignore_zero_checksum
1361 def assert_tcp_checksum_valid(self, received_packet, ignore_zero_checksum=False):
1362 self.assert_checksum_valid(
1363 received_packet, "TCP", ignore_zero_checksum=ignore_zero_checksum
1366 def assert_udp_checksum_valid(self, received_packet, ignore_zero_checksum=True):
1367 self.assert_checksum_valid(
1368 received_packet, "UDP", ignore_zero_checksum=ignore_zero_checksum
1371 def assert_embedded_icmp_checksum_valid(self, received_packet):
1372 if received_packet.haslayer(IPerror):
1373 self.assert_checksum_valid(received_packet, "IPerror")
1374 if received_packet.haslayer(TCPerror):
1375 self.assert_checksum_valid(received_packet, "TCPerror")
1376 if received_packet.haslayer(UDPerror):
1377 self.assert_checksum_valid(
1378 received_packet, "UDPerror", ignore_zero_checksum=True
1380 if received_packet.haslayer(ICMPerror):
1381 self.assert_checksum_valid(received_packet, "ICMPerror")
1383 def assert_icmp_checksum_valid(self, received_packet):
1384 self.assert_checksum_valid(received_packet, "ICMP")
1385 self.assert_embedded_icmp_checksum_valid(received_packet)
1387 def assert_icmpv6_checksum_valid(self, pkt):
1388 if pkt.haslayer(ICMPv6DestUnreach):
1389 self.assert_checksum_valid(pkt, "ICMPv6DestUnreach", "cksum")
1390 self.assert_embedded_icmp_checksum_valid(pkt)
1391 if pkt.haslayer(ICMPv6EchoRequest):
1392 self.assert_checksum_valid(pkt, "ICMPv6EchoRequest", "cksum")
1393 if pkt.haslayer(ICMPv6EchoReply):
1394 self.assert_checksum_valid(pkt, "ICMPv6EchoReply", "cksum")
1396 def get_counter(self, counter):
1397 if counter.startswith("/"):
1398 counter_value = self.statistics.get_counter(counter)
1400 counters = self.vapi.cli("sh errors").split("\n")
1402 for i in range(1, len(counters) - 1):
1403 results = counters[i].split()
1404 if results[1] == counter:
1405 counter_value = int(results[0])
1407 return counter_value
1409 def assert_counter_equal(self, counter, expected_value, thread=None, index=0):
1410 c = self.get_counter(counter)
1411 if thread is not None:
1412 c = c[thread][index]
1414 c = sum(x[index] for x in c)
1415 self.assert_equal(c, expected_value, "counter `%s'" % counter)
1417 def assert_packet_counter_equal(self, counter, expected_value):
1418 counter_value = self.get_counter(counter)
1420 counter_value, expected_value, "packet counter `%s'" % counter
1423 def assert_error_counter_equal(self, counter, expected_value):
1424 counter_value = self.statistics[counter].sum()
1425 self.assert_equal(counter_value, expected_value, "error counter `%s'" % counter)
1428 def sleep(cls, timeout, remark=None):
1430 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1431 # * by Guido, only the main thread can be interrupted.
1433 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1436 if hasattr(os, "sched_yield"):
1442 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1443 before = time.time()
1446 if after - before > 2 * timeout:
1448 "unexpected self.sleep() result - slept for %es instead of ~%es!",
1454 "Finished sleep (%s) - slept %es (wanted %es)",
1460 def virtual_sleep(self, timeout, remark=None):
1461 self.logger.debug("Moving VPP time by %s (%s)", timeout, remark)
1462 self.vapi.cli("set clock adjust %s" % timeout)
1464 def pg_send(self, intf, pkts, worker=None, trace=True):
1465 intf.add_stream(pkts, worker=worker)
1466 self.pg_enable_capture(self.pg_interfaces)
1467 self.pg_start(trace=trace)
1469 def snapshot_stats(self, stats_diff):
1470 """Return snapshot of interesting stats based on diff dictionary."""
1472 for sw_if_index in stats_diff:
1473 for counter in stats_diff[sw_if_index]:
1474 stats_snapshot[counter] = self.statistics[counter]
1475 self.logger.debug(f"Took statistics stats_snapshot: {stats_snapshot}")
1476 return stats_snapshot
1478 def compare_stats_with_snapshot(self, stats_diff, stats_snapshot):
1479 """Assert appropriate difference between current stats and snapshot."""
1480 for sw_if_index in stats_diff:
1481 for cntr, diff in stats_diff[sw_if_index].items():
1482 if sw_if_index == "err":
1484 self.statistics[cntr].sum(),
1485 stats_snapshot[cntr].sum() + diff,
1486 f"'{cntr}' counter value (previous value: "
1487 f"{stats_snapshot[cntr].sum()}, "
1488 f"expected diff: {diff})",
1493 self.statistics[cntr][:, sw_if_index].sum(),
1494 stats_snapshot[cntr][:, sw_if_index].sum() + diff,
1495 f"'{cntr}' counter value (previous value: "
1496 f"{stats_snapshot[cntr][:, sw_if_index].sum()}, "
1497 f"expected diff: {diff})",
1500 # if diff is 0, then this most probably a case where
1501 # test declares multiple interfaces but traffic hasn't
1502 # passed through this one yet - which means the counter
1503 # value is 0 and can be ignored
1507 def send_and_assert_no_replies(
1508 self, intf, pkts, remark="", timeout=None, stats_diff=None, trace=True, msg=None
1511 stats_snapshot = self.snapshot_stats(stats_diff)
1513 self.pg_send(intf, pkts)
1518 for i in self.pg_interfaces:
1519 i.assert_nothing_captured(timeout=timeout, remark=remark)
1524 self.logger.debug(f"send_and_assert_no_replies: {msg}")
1525 self.logger.debug(self.vapi.cli("show trace"))
1528 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1530 def send_and_expect(
1542 stats_snapshot = self.snapshot_stats(stats_diff)
1545 n_rx = 1 if isinstance(pkts, Packet) else len(pkts)
1546 self.pg_send(intf, pkts, worker=worker, trace=trace)
1547 rx = output.get_capture(n_rx)
1550 self.logger.debug(f"send_and_expect: {msg}")
1551 self.logger.debug(self.vapi.cli("show trace"))
1554 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1558 def send_and_expect_load_balancing(
1559 self, input, pkts, outputs, worker=None, trace=True
1561 self.pg_send(input, pkts, worker=worker, trace=trace)
1564 rx = oo._get_capture(1)
1565 self.assertNotEqual(0, len(rx))
1568 self.logger.debug(self.vapi.cli("show trace"))
1571 def send_and_expect_some(self, intf, pkts, output, worker=None, trace=True):
1572 self.pg_send(intf, pkts, worker=worker, trace=trace)
1573 rx = output._get_capture(1)
1575 self.logger.debug(self.vapi.cli("show trace"))
1576 self.assertTrue(len(rx) > 0)
1577 self.assertTrue(len(rx) < len(pkts))
1580 def send_and_expect_only(self, intf, pkts, output, timeout=None, stats_diff=None):
1582 stats_snapshot = self.snapshot_stats(stats_diff)
1584 self.pg_send(intf, pkts)
1585 rx = output.get_capture(len(pkts))
1589 for i in self.pg_interfaces:
1590 if i not in outputs:
1591 i.assert_nothing_captured(timeout=timeout)
1595 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1600 def get_testcase_doc_name(test):
1601 return getdoc(test.__class__).splitlines()[0]
1604 def get_test_description(descriptions, test):
1605 short_description = test.shortDescription()
1606 if descriptions and short_description:
1607 return short_description
1612 class TestCaseInfo(object):
1613 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1614 self.logger = logger
1615 self.tempdir = tempdir
1616 self.vpp_pid = vpp_pid
1617 self.vpp_bin_path = vpp_bin_path
1618 self.core_crash_test = None
1621 class VppTestResult(unittest.TestResult):
1623 @property result_string
1624 String variable to store the test case result string.
1626 List variable containing 2-tuples of TestCase instances and strings
1627 holding formatted tracebacks. Each tuple represents a test which
1628 raised an unexpected exception.
1630 List variable containing 2-tuples of TestCase instances and strings
1631 holding formatted tracebacks. Each tuple represents a test where
1632 a failure was explicitly signalled using the TestCase.assert*()
1636 failed_test_cases_info = set()
1637 core_crash_test_cases_info = set()
1638 current_test_case_info = None
1640 def __init__(self, stream=None, descriptions=None, verbosity=None, runner=None):
1642 :param stream File descriptor to store where to report test results.
1643 Set to the standard error stream by default.
1644 :param descriptions Boolean variable to store information if to use
1645 test case descriptions.
1646 :param verbosity Integer variable to store required verbosity level.
1648 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1649 self.stream = stream
1650 self.descriptions = descriptions
1651 self.verbosity = verbosity
1652 self.result_string = None
1653 self.runner = runner
1656 def addSuccess(self, test):
1658 Record a test succeeded result
1663 if self.current_test_case_info:
1664 self.current_test_case_info.logger.debug(
1665 "--- addSuccess() %s.%s(%s) called"
1666 % (test.__class__.__name__, test._testMethodName, test._testMethodDoc)
1668 unittest.TestResult.addSuccess(self, test)
1669 self.result_string = colorize("OK", GREEN)
1671 self.send_result_through_pipe(test, PASS)
1673 def addSkip(self, test, reason):
1675 Record a test skipped.
1681 if self.current_test_case_info:
1682 self.current_test_case_info.logger.debug(
1683 "--- addSkip() %s.%s(%s) called, reason is %s"
1685 test.__class__.__name__,
1686 test._testMethodName,
1687 test._testMethodDoc,
1691 unittest.TestResult.addSkip(self, test, reason)
1692 self.result_string = colorize("SKIP", YELLOW)
1694 if reason == "not enough cpus":
1695 self.send_result_through_pipe(test, SKIP_CPU_SHORTAGE)
1697 self.send_result_through_pipe(test, SKIP)
1699 def symlink_failed(self):
1700 if self.current_test_case_info:
1702 failed_dir = config.failed_dir
1703 link_path = os.path.join(
1705 "%s-FAILED" % os.path.basename(self.current_test_case_info.tempdir),
1708 self.current_test_case_info.logger.debug(
1709 "creating a link to the failed test"
1711 self.current_test_case_info.logger.debug(
1712 "os.symlink(%s, %s)"
1713 % (self.current_test_case_info.tempdir, link_path)
1715 if os.path.exists(link_path):
1716 self.current_test_case_info.logger.debug("symlink already exists")
1718 os.symlink(self.current_test_case_info.tempdir, link_path)
1720 except Exception as e:
1721 self.current_test_case_info.logger.error(e)
1723 def send_result_through_pipe(self, test, result):
1724 if hasattr(self, "test_framework_result_pipe"):
1725 pipe = self.test_framework_result_pipe
1727 pipe.send((test.id(), result))
1729 def log_error(self, test, err, fn_name):
1730 if self.current_test_case_info:
1731 if isinstance(test, unittest.suite._ErrorHolder):
1732 test_name = test.description
1734 test_name = "%s.%s(%s)" % (
1735 test.__class__.__name__,
1736 test._testMethodName,
1737 test._testMethodDoc,
1739 self.current_test_case_info.logger.debug(
1740 "--- %s() %s called, err is %s" % (fn_name, test_name, err)
1742 self.current_test_case_info.logger.debug(
1743 "formatted exception is:\n%s" % "".join(format_exception(*err))
1746 def add_error(self, test, err, unittest_fn, error_type):
1747 if error_type == FAIL:
1748 self.log_error(test, err, "addFailure")
1749 error_type_str = colorize("FAIL", RED)
1750 elif error_type == ERROR:
1751 self.log_error(test, err, "addError")
1752 error_type_str = colorize("ERROR", RED)
1755 "Error type %s cannot be used to record an "
1756 "error or a failure" % error_type
1759 unittest_fn(self, test, err)
1760 if self.current_test_case_info:
1761 self.result_string = "%s [ temp dir used by test case: %s ]" % (
1763 self.current_test_case_info.tempdir,
1765 self.symlink_failed()
1766 self.failed_test_cases_info.add(self.current_test_case_info)
1767 if is_core_present(self.current_test_case_info.tempdir):
1768 if not self.current_test_case_info.core_crash_test:
1769 if isinstance(test, unittest.suite._ErrorHolder):
1770 test_name = str(test)
1772 test_name = "'{!s}' ({!s})".format(
1773 get_testcase_doc_name(test), test.id()
1775 self.current_test_case_info.core_crash_test = test_name
1776 self.core_crash_test_cases_info.add(self.current_test_case_info)
1778 self.result_string = "%s [no temp dir]" % error_type_str
1780 self.send_result_through_pipe(test, error_type)
1782 def addFailure(self, test, err):
1784 Record a test failed result
1787 :param err: error message
1790 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1792 def addError(self, test, err):
1794 Record a test error result
1797 :param err: error message
1800 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1802 def getDescription(self, test):
1804 Get test description
1807 :returns: test description
1810 return get_test_description(self.descriptions, test)
1812 def startTest(self, test):
1820 def print_header(test):
1821 if test.__class__ in self.printed:
1824 test_doc = getdoc(test)
1826 raise Exception("No doc string for test '%s'" % test.id())
1828 test_title = test_doc.splitlines()[0].rstrip()
1829 test_title = colorize(test_title, GREEN)
1830 if test.is_tagged_run_solo():
1831 test_title = colorize(f"SOLO RUN: {test_title}", YELLOW)
1833 # This block may overwrite the colorized title above,
1834 # but we want this to stand out and be fixed
1835 if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
1836 test_title = colorize(f"FIXME with VPP workers: {test_title}", RED)
1838 if test.has_tag(TestCaseTag.FIXME_ASAN):
1839 test_title = colorize(f"FIXME with ASAN: {test_title}", RED)
1840 test.skip_fixme_asan()
1842 if is_distro_ubuntu2204 == True and test.has_tag(
1843 TestCaseTag.FIXME_UBUNTU2204
1845 test_title = colorize(f"FIXME on Ubuntu-22.04: {test_title}", RED)
1846 test.skip_fixme_ubuntu2204()
1848 if is_distro_debian11 == True and test.has_tag(TestCaseTag.FIXME_DEBIAN11):
1849 test_title = colorize(f"FIXME on Debian-11: {test_title}", RED)
1850 test.skip_fixme_debian11()
1852 if "debug" in config.vpp_tag and test.has_tag(TestCaseTag.FIXME_VPP_DEBUG):
1853 test_title = colorize(f"FIXME on VPP Debug: {test_title}", RED)
1854 test.skip_fixme_vpp_debug()
1856 if hasattr(test, "vpp_worker_count"):
1857 if test.vpp_worker_count == 0:
1858 test_title += " [main thread only]"
1859 elif test.vpp_worker_count == 1:
1860 test_title += " [1 worker thread]"
1862 test_title += f" [{test.vpp_worker_count} worker threads]"
1864 if test.__class__.skipped_due_to_cpu_lack:
1865 test_title = colorize(
1866 f"{test_title} [skipped - not enough cpus, "
1867 f"required={test.__class__.get_cpus_required()}, "
1868 f"available={max_vpp_cpus}]",
1872 print(double_line_delim)
1874 print(double_line_delim)
1875 self.printed.append(test.__class__)
1878 self.start_test = time.time()
1879 unittest.TestResult.startTest(self, test)
1880 if self.verbosity > 0:
1881 self.stream.writeln("Starting " + self.getDescription(test) + " ...")
1882 self.stream.writeln(single_line_delim)
1884 def stopTest(self, test):
1886 Called when the given test has been run
1891 unittest.TestResult.stopTest(self, test)
1893 if self.verbosity > 0:
1894 self.stream.writeln(single_line_delim)
1895 self.stream.writeln(
1896 "%-73s%s" % (self.getDescription(test), self.result_string)
1898 self.stream.writeln(single_line_delim)
1900 self.stream.writeln(
1903 self.getDescription(test),
1904 time.time() - self.start_test,
1909 self.send_result_through_pipe(test, TEST_RUN)
1911 def printErrors(self):
1913 Print errors from running the test case
1915 if len(self.errors) > 0 or len(self.failures) > 0:
1916 self.stream.writeln()
1917 self.printErrorList("ERROR", self.errors)
1918 self.printErrorList("FAIL", self.failures)
1920 # ^^ that is the last output from unittest before summary
1921 if not self.runner.print_summary:
1922 devnull = unittest.runner._WritelnDecorator(open(os.devnull, "w"))
1923 self.stream = devnull
1924 self.runner.stream = devnull
1926 def printErrorList(self, flavour, errors):
1928 Print error list to the output stream together with error type
1929 and test case description.
1931 :param flavour: error type
1932 :param errors: iterable errors
1935 for test, err in errors:
1936 self.stream.writeln(double_line_delim)
1937 self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
1938 self.stream.writeln(single_line_delim)
1939 self.stream.writeln("%s" % err)
1942 class VppTestRunner(unittest.TextTestRunner):
1944 A basic test runner implementation which prints results to standard error.
1948 def resultclass(self):
1949 """Class maintaining the results of the tests"""
1950 return VppTestResult
1954 keep_alive_pipe=None,
1964 # ignore stream setting here, use hard-coded stdout to be in sync
1965 # with prints from VppTestCase methods ...
1966 super(VppTestRunner, self).__init__(
1967 sys.stdout, descriptions, verbosity, failfast, buffer, resultclass, **kwargs
1969 KeepAliveReporter.pipe = keep_alive_pipe
1971 self.orig_stream = self.stream
1972 self.resultclass.test_framework_result_pipe = result_pipe
1974 self.print_summary = print_summary
1976 def _makeResult(self):
1977 return self.resultclass(self.stream, self.descriptions, self.verbosity, self)
1979 def run(self, test):
1986 faulthandler.enable() # emit stack trace to stderr if killed by signal
1988 result = super(VppTestRunner, self).run(test)
1989 if not self.print_summary:
1990 self.stream = self.orig_stream
1991 result.stream = self.orig_stream
1995 class Worker(Thread):
1996 def __init__(self, executable_args, logger, env=None, *args, **kwargs):
1997 super(Worker, self).__init__(*args, **kwargs)
1998 self.logger = logger
1999 self.args = executable_args
2000 if hasattr(self, "testcase") and self.testcase.debug_all:
2001 if self.testcase.debug_gdbserver:
2003 "/usr/bin/gdbserver",
2004 "localhost:{port}".format(port=self.testcase.gdbserver_port),
2006 elif self.testcase.debug_gdb and hasattr(self, "wait_for_gdb"):
2007 self.args.append(self.wait_for_gdb)
2008 self.app_bin = executable_args[0]
2009 self.app_name = os.path.basename(self.app_bin)
2010 if hasattr(self, "role"):
2011 self.app_name += " {role}".format(role=self.role)
2014 env = {} if env is None else env
2015 self.env = copy.deepcopy(env)
2017 def wait_for_enter(self):
2018 if not hasattr(self, "testcase"):
2020 if self.testcase.debug_all and self.testcase.debug_gdbserver:
2022 print(double_line_delim)
2024 "Spawned GDB Server for '{app}' with PID: {pid}".format(
2025 app=self.app_name, pid=self.process.pid
2028 elif self.testcase.debug_all and self.testcase.debug_gdb:
2030 print(double_line_delim)
2032 "Spawned '{app}' with PID: {pid}".format(
2033 app=self.app_name, pid=self.process.pid
2038 print(single_line_delim)
2039 print("You can debug '{app}' using:".format(app=self.app_name))
2040 if self.testcase.debug_gdbserver:
2044 + " -ex 'target remote localhost:{port}'".format(
2045 port=self.testcase.gdbserver_port
2049 "Now is the time to attach gdb by running the above "
2050 "command, set up breakpoints etc., then resume from "
2051 "within gdb by issuing the 'continue' command"
2053 self.testcase.gdbserver_port += 1
2054 elif self.testcase.debug_gdb:
2058 + " -ex 'attach {pid}'".format(pid=self.process.pid)
2061 "Now is the time to attach gdb by running the above "
2062 "command and set up breakpoints etc., then resume from"
2063 " within gdb by issuing the 'continue' command"
2065 print(single_line_delim)
2066 input("Press ENTER to continue running the testcase...")
2069 executable = self.args[0]
2070 if not os.path.exists(executable) or not os.access(
2071 executable, os.F_OK | os.X_OK
2073 # Exit code that means some system file did not exist,
2074 # could not be opened, or had some other kind of error.
2075 self.result = os.EX_OSFILE
2076 raise EnvironmentError(
2077 "executable '%s' is not found or executable." % executable
2080 "Running executable '{app}': '{cmd}'".format(
2081 app=self.app_name, cmd=" ".join(self.args)
2084 env = os.environ.copy()
2085 env.update(self.env)
2086 env["CK_LOG_FILE_NAME"] = "-"
2087 self.process = subprocess.Popen(
2088 ["stdbuf", "-o0", "-e0"] + self.args,
2091 preexec_fn=os.setpgrp,
2092 stdout=subprocess.PIPE,
2093 stderr=subprocess.PIPE,
2095 self.wait_for_enter()
2096 out, err = self.process.communicate()
2097 self.logger.debug("Finished running `{app}'".format(app=self.app_name))
2098 self.logger.info("Return code is `%s'" % self.process.returncode)
2099 self.logger.info(single_line_delim)
2101 "Executable `{app}' wrote to stdout:".format(app=self.app_name)
2103 self.logger.info(single_line_delim)
2104 self.logger.info(out.decode("utf-8"))
2105 self.logger.info(single_line_delim)
2107 "Executable `{app}' wrote to stderr:".format(app=self.app_name)
2109 self.logger.info(single_line_delim)
2110 self.logger.info(err.decode("utf-8"))
2111 self.logger.info(single_line_delim)
2112 self.result = self.process.returncode
2115 if __name__ == "__main__":