3 from __future__ import print_function
18 from collections import deque
19 from threading import Thread, Event
20 from inspect import getdoc, isclass
21 from traceback import format_exception
22 from logging import FileHandler, DEBUG, Formatter
24 from abc import ABC, abstractmethod
25 from struct import pack, unpack
28 from scapy.packet import Raw, Packet
29 from config import config, available_cpus, num_cpus, max_vpp_cpus
30 import hook as hookmodule
31 from vpp_pg_interface import VppPGInterface
32 from vpp_sub_interface import VppSubInterface
33 from vpp_lo_interface import VppLoInterface
34 from vpp_bvi_interface import VppBviInterface
35 from vpp_papi_provider import VppPapiProvider
36 from vpp_papi import VppEnum
38 from vpp_papi.vpp_stats import VPPStats
39 from vpp_papi.vpp_transport_socket import VppTransportSocketIOError
49 from vpp_object import VppObjectRegistry
50 from util import ppp, is_core_present
51 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
52 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
53 from scapy.layers.inet6 import ICMPv6EchoReply
54 from vpp_running import use_running
57 logger = logging.getLogger(__name__)
59 # Set up an empty logger for the testcase that can be overridden as necessary
60 null_logger = logging.getLogger("VppTestCase")
61 null_logger.addHandler(logging.NullHandler())
71 if config.debug_framework:
75 Test framework module.
77 The module provides a set of tools for constructing and running tests and
78 representing the results.
82 class VppDiedError(Exception):
83 """exception for reporting that the subprocess has died."""
87 for k, v in signal.__dict__.items()
88 if k.startswith("SIG") and not k.startswith("SIG_")
91 def __init__(self, rv=None, testcase=None, method_name=None):
93 self.signal_name = None
94 self.testcase = testcase
95 self.method_name = method_name
98 self.signal_name = VppDiedError.signals_by_value[-rv]
99 except (KeyError, TypeError):
102 if testcase is None and method_name is None:
105 in_msg = " while running %s.%s" % (testcase, method_name)
108 msg = "VPP subprocess died unexpectedly%s with return code: %d%s." % (
111 " [%s]" % (self.signal_name if self.signal_name is not None else ""),
114 msg = "VPP subprocess died unexpectedly%s." % in_msg
116 super(VppDiedError, self).__init__(msg)
119 class _PacketInfo(object):
120 """Private class to create packet info object.
122 Help process information about the next packet.
123 Set variables to default values.
126 #: Store the index of the packet.
128 #: Store the index of the source packet generator interface of the packet.
130 #: Store the index of the destination packet generator interface
133 #: Store expected ip version
135 #: Store expected upper protocol
137 #: Store the copy of the former packet.
140 def __eq__(self, other):
141 index = self.index == other.index
142 src = self.src == other.src
143 dst = self.dst == other.dst
144 data = self.data == other.data
145 return index and src and dst and data
148 def pump_output(testclass):
149 """pump output from vpp stdout/stderr to proper queues"""
150 if not hasattr(testclass, "vpp"):
154 while not testclass.pump_thread_stop_flag.is_set():
155 readable = select.select(
157 testclass.vpp.stdout.fileno(),
158 testclass.vpp.stderr.fileno(),
159 testclass.pump_thread_wakeup_pipe[0],
164 if testclass.vpp.stdout.fileno() in readable:
165 read = os.read(testclass.vpp.stdout.fileno(), 102400)
167 split = read.decode("ascii", errors="backslashreplace").splitlines(True)
168 if len(stdout_fragment) > 0:
169 split[0] = "%s%s" % (stdout_fragment, split[0])
170 if len(split) > 0 and split[-1].endswith("\n"):
174 stdout_fragment = split[-1]
175 testclass.vpp_stdout_deque.extend(split[:limit])
176 if not config.cache_vpp_output:
177 for line in split[:limit]:
178 testclass.logger.info("VPP STDOUT: %s" % line.rstrip("\n"))
179 if testclass.vpp.stderr.fileno() in readable:
180 read = os.read(testclass.vpp.stderr.fileno(), 102400)
182 split = read.decode("ascii", errors="backslashreplace").splitlines(True)
183 if len(stderr_fragment) > 0:
184 split[0] = "%s%s" % (stderr_fragment, split[0])
185 if len(split) > 0 and split[-1].endswith("\n"):
189 stderr_fragment = split[-1]
191 testclass.vpp_stderr_deque.extend(split[:limit])
192 if not config.cache_vpp_output:
193 for line in split[:limit]:
194 testclass.logger.error("VPP STDERR: %s" % line.rstrip("\n"))
195 # ignoring the dummy pipe here intentionally - the
196 # flag will take care of properly terminating the loop
199 def _is_platform_aarch64():
200 return platform.machine() == "aarch64"
203 is_platform_aarch64 = _is_platform_aarch64()
206 def _is_distro_ubuntu2204():
207 with open("/etc/os-release") as f:
208 for line in f.readlines():
214 is_distro_ubuntu2204 = _is_distro_ubuntu2204()
217 def _is_distro_debian11():
218 with open("/etc/os-release") as f:
219 for line in f.readlines():
220 if "bullseye" in line:
225 is_distro_debian11 = _is_distro_debian11()
228 class KeepAliveReporter(object):
230 Singleton object which reports test start to parent process
236 self.__dict__ = self._shared_state
244 def pipe(self, pipe):
245 if self._pipe is not None:
246 raise Exception("Internal error - pipe should only be set once.")
249 def send_keep_alive(self, test, desc=None):
251 Write current test tmpdir & desc to keep-alive pipe to signal liveness
253 if not hasattr(test, "vpp") or self.pipe is None:
254 # if not running forked..
258 desc = "%s (%s)" % (desc, unittest.util.strclass(test))
262 self.pipe.send((desc, config.vpp, test.tempdir, test.vpp.pid))
265 class TestCaseTag(Enum):
266 # marks the suites that must run at the end
267 # using only a single test runner
269 # marks the suites broken on VPP multi-worker
270 FIXME_VPP_WORKERS = 2
271 # marks the suites broken when ASan is enabled
273 # marks suites broken on Ubuntu-22.04
275 # marks suites broken on Debian-11
277 # marks suites broken on debug vpp image
281 def create_tag_decorator(e):
284 cls.test_tags.append(e)
285 except AttributeError:
292 tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
293 tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
294 tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN)
295 tag_fixme_ubuntu2204 = create_tag_decorator(TestCaseTag.FIXME_UBUNTU2204)
296 tag_fixme_debian11 = create_tag_decorator(TestCaseTag.FIXME_DEBIAN11)
297 tag_fixme_vpp_debug = create_tag_decorator(TestCaseTag.FIXME_VPP_DEBUG)
311 class CPUInterface(ABC):
313 skipped_due_to_cpu_lack = False
317 def get_cpus_required(cls):
321 def assign_cpus(cls, cpus):
326 class VppTestCase(CPUInterface, unittest.TestCase):
327 """This subclass is a base class for VPP test cases that are implemented as
328 classes. It provides methods to create and run test case.
331 extra_vpp_statseg_config = ""
332 extra_vpp_punt_config = []
333 extra_vpp_plugin_config = []
335 vapi_response_timeout = 5
336 remove_configured_vpp_objects_on_tear_down = True
339 def packet_infos(self):
340 """List of packet infos"""
341 return self._packet_infos
344 def get_packet_count_for_if_idx(cls, dst_if_index):
345 """Get the number of packet info for specified destination if index"""
346 if dst_if_index in cls._packet_count_for_dst_if_idx:
347 return cls._packet_count_for_dst_if_idx[dst_if_index]
352 def has_tag(cls, tag):
353 """if the test case has a given tag - return true"""
355 return tag in cls.test_tags
356 except AttributeError:
361 def is_tagged_run_solo(cls):
362 """if the test case class is timing-sensitive - return true"""
363 return cls.has_tag(TestCaseTag.RUN_SOLO)
366 def skip_fixme_asan(cls):
367 """if @tag_fixme_asan & ASan is enabled - mark for skip"""
368 if cls.has_tag(TestCaseTag.FIXME_ASAN):
369 vpp_extra_cmake_args = os.environ.get("VPP_EXTRA_CMAKE_ARGS", "")
370 if "DVPP_ENABLE_SANITIZE_ADDR=ON" in vpp_extra_cmake_args:
371 cls = unittest.skip("Skipping @tag_fixme_asan tests")(cls)
374 def skip_fixme_ubuntu2204(cls):
375 """if distro is ubuntu 22.04 and @tag_fixme_ubuntu2204 mark for skip"""
376 if cls.has_tag(TestCaseTag.FIXME_UBUNTU2204):
377 cls = unittest.skip("Skipping @tag_fixme_ubuntu2204 tests")(cls)
380 def skip_fixme_debian11(cls):
381 """if distro is Debian-11 and @tag_fixme_debian11 mark for skip"""
382 if cls.has_tag(TestCaseTag.FIXME_DEBIAN11):
383 cls = unittest.skip("Skipping @tag_fixme_debian11 tests")(cls)
386 def skip_fixme_vpp_debug(cls):
387 cls = unittest.skip("Skipping @tag_fixme_vpp_debug tests")(cls)
391 """Return the instance of this testcase"""
392 return cls.test_instance
395 def set_debug_flags(cls, d):
396 cls.gdbserver_port = 7777
397 cls.debug_core = False
398 cls.debug_gdb = False
399 cls.debug_gdbserver = False
400 cls.debug_all = False
401 cls.debug_attach = False
406 cls.debug_core = True
407 elif dl == "gdb" or dl == "gdb-all":
409 elif dl == "gdbserver" or dl == "gdbserver-all":
410 cls.debug_gdbserver = True
412 cls.debug_attach = True
414 raise Exception("Unrecognized DEBUG option: '%s'" % d)
415 if dl == "gdb-all" or dl == "gdbserver-all":
419 def get_vpp_worker_count(cls):
420 if not hasattr(cls, "vpp_worker_count"):
421 if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
422 cls.vpp_worker_count = 0
424 cls.vpp_worker_count = config.vpp_worker_count
425 return cls.vpp_worker_count
428 def get_cpus_required(cls):
429 return 1 + cls.get_vpp_worker_count()
432 def setUpConstants(cls):
433 """Set-up the test case class based on environment variables"""
434 cls.step = config.step
435 cls.plugin_path = ":".join(config.vpp_plugin_dir)
436 cls.test_plugin_path = ":".join(config.vpp_test_plugin_dir)
437 cls.extern_plugin_path = ":".join(config.extern_plugin_dir)
439 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
440 debug_cli = "cli-listen localhost:5002"
441 size = re.search(r"\d+[gG]", config.coredump_size)
443 coredump_size = f"coredump-size {config.coredump_size}".lower()
445 coredump_size = "coredump-size unlimited"
446 default_variant = config.variant
447 if default_variant is not None:
448 default_variant = "default { variant %s 100 }" % default_variant
452 api_fuzzing = config.api_fuzz
453 if api_fuzzing is None:
474 cls.get_api_segment_prefix(),
481 if cls.extern_plugin_path not in (None, ""):
482 cls.extra_vpp_plugin_config.append("add-path %s" % cls.extern_plugin_path)
483 if cls.get_vpp_worker_count():
484 cls.vpp_cmdline.extend(
485 ["corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])]
487 cls.vpp_cmdline.extend(
498 cls.get_stats_sock_path(),
499 cls.extra_vpp_statseg_config,
504 cls.get_api_sock_path(),
525 "lisp_unittest_plugin.so",
530 "unittest_plugin.so",
535 + cls.extra_vpp_plugin_config
541 if cls.extra_vpp_punt_config is not None:
542 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
544 if not cls.debug_attach:
545 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
546 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
549 def wait_for_enter(cls):
550 if cls.debug_gdbserver:
551 print(double_line_delim)
552 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
554 print(double_line_delim)
555 print("Spawned VPP with PID: %d" % cls.vpp.pid)
557 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
559 print(single_line_delim)
560 print("You can debug VPP using:")
561 if cls.debug_gdbserver:
563 f"sudo gdb {config.vpp} "
564 f"-ex 'target remote localhost:{cls.gdbserver_port}'"
567 "Now is the time to attach gdb by running the above "
568 "command, set up breakpoints etc., then resume VPP from "
569 "within gdb by issuing the 'continue' command"
571 cls.gdbserver_port += 1
573 print(f"sudo gdb {config.vpp} -ex 'attach {cls.vpp.pid}'")
575 "Now is the time to attach gdb by running the above "
576 "command and set up breakpoints etc., then resume VPP from"
577 " within gdb by issuing the 'continue' command"
579 print(single_line_delim)
580 input("Press ENTER to continue running the testcase...")
589 is_distro_ubuntu2204 == True and cls.has_tag(TestCaseTag.FIXME_UBUNTU2204)
590 ) or (is_distro_debian11 == True and cls.has_tag(TestCaseTag.FIXME_DEBIAN11)):
592 cls.logger.debug(f"Assigned cpus: {cls.cpus}")
593 cmdline = cls.vpp_cmdline
595 if cls.debug_gdbserver:
596 gdbserver = "/usr/bin/gdbserver"
597 if not os.path.isfile(gdbserver) or not os.access(gdbserver, os.X_OK):
599 "gdbserver binary '%s' does not exist or is "
600 "not executable" % gdbserver
605 "localhost:{port}".format(port=cls.gdbserver_port),
607 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
610 cls.vpp = subprocess.Popen(
611 cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE
613 except subprocess.CalledProcessError as e:
615 "Subprocess returned with non-0 return code: (%s)", e.returncode
620 "Subprocess returned with OS error: (%s) %s", e.errno, e.strerror
623 except Exception as e:
624 cls.logger.exception("Subprocess returned unexpected from %s:", cmdline)
630 def wait_for_coredump(cls):
631 corefile = cls.tempdir + "/core"
632 if os.path.isfile(corefile):
633 cls.logger.error("Waiting for coredump to complete: %s", corefile)
634 curr_size = os.path.getsize(corefile)
635 deadline = time.time() + 60
637 while time.time() < deadline:
640 curr_size = os.path.getsize(corefile)
641 if size == curr_size:
646 "Timed out waiting for coredump to complete: %s", corefile
649 cls.logger.error("Coredump complete: %s, size %d", corefile, curr_size)
652 def get_stats_sock_path(cls):
653 return "%s/stats.sock" % cls.tempdir
656 def get_api_sock_path(cls):
657 return "%s/api.sock" % cls.tempdir
660 def get_api_segment_prefix(cls):
661 return os.path.basename(cls.tempdir) # Only used for VAPI
664 def get_tempdir(cls):
666 tmpdir = f"{config.tmp_dir}/unittest-attach-gdb"
668 tmpdir = f"{config.tmp_dir}/vpp-unittest-{cls.__name__}"
669 if config.wipe_tmp_dir:
670 shutil.rmtree(tmpdir, ignore_errors=True)
675 def create_file_handler(cls):
676 if config.log_dir is None:
677 cls.file_handler = FileHandler(f"{cls.tempdir}/log.txt")
680 logdir = f"{config.log_dir}/vpp-unittest-{cls.__name__}"
681 if config.wipe_tmp_dir:
682 shutil.rmtree(logdir, ignore_errors=True)
684 cls.file_handler = FileHandler(f"{logdir}/log.txt")
689 Perform class setup before running the testcase
690 Remove shared memory files, start vpp and connect the vpp-api
692 super(VppTestCase, cls).setUpClass()
693 cls.logger = get_logger(cls.__name__)
694 random.seed(config.rnd_seed)
695 if hasattr(cls, "parallel_handler"):
696 cls.logger.addHandler(cls.parallel_handler)
697 cls.logger.propagate = False
698 cls.set_debug_flags(config.debug)
699 cls.tempdir = cls.get_tempdir()
700 cls.create_file_handler()
701 cls.file_handler.setFormatter(
702 Formatter(fmt="%(asctime)s,%(msecs)03d %(message)s", datefmt="%H:%M:%S")
704 cls.file_handler.setLevel(DEBUG)
705 cls.logger.addHandler(cls.file_handler)
706 cls.logger.debug("--- setUpClass() for %s called ---" % cls.__name__)
707 os.chdir(cls.tempdir)
709 "Temporary dir is %s, api socket is %s",
711 cls.get_api_sock_path(),
713 cls.logger.debug("Random seed is %s", config.rnd_seed)
715 cls.reset_packet_infos()
720 cls.registry = VppObjectRegistry()
721 cls.vpp_startup_failed = False
722 cls.reporter = KeepAliveReporter()
723 # need to catch exceptions here because if we raise, then the cleanup
724 # doesn't get called and we might end with a zombie vpp
730 if not hasattr(cls, "vpp"):
732 cls.reporter.send_keep_alive(cls, "setUpClass")
733 VppTestResult.current_test_case_info = TestCaseInfo(
734 cls.logger, cls.tempdir, cls.vpp.pid, config.vpp
736 cls.vpp_stdout_deque = deque()
737 cls.vpp_stderr_deque = deque()
738 # Pump thread in a non-debug-attached & not running-vpp
739 if not cls.debug_attach and not hasattr(cls, "running_vpp"):
740 cls.pump_thread_stop_flag = Event()
741 cls.pump_thread_wakeup_pipe = os.pipe()
742 cls.pump_thread = Thread(target=pump_output, args=(cls,))
743 cls.pump_thread.daemon = True
744 cls.pump_thread.start()
745 if cls.debug_gdb or cls.debug_gdbserver or cls.debug_attach:
746 cls.vapi_response_timeout = 0
747 cls.vapi = VppPapiProvider(cls.__name__, cls, cls.vapi_response_timeout)
749 hook = hookmodule.StepHook(cls)
751 hook = hookmodule.PollHook(cls)
752 cls.vapi.register_hook(hook)
753 cls.statistics = VPPStats(socketname=cls.get_stats_sock_path())
757 cls.vpp_startup_failed = True
759 "VPP died shortly after startup, check the"
760 " output to standard error for possible cause"
765 except (vpp_papi.VPPIOError, Exception) as e:
766 cls.logger.debug("Exception connecting to vapi: %s" % e)
767 cls.vapi.disconnect()
769 if cls.debug_gdbserver:
772 "You're running VPP inside gdbserver but "
773 "VPP-API connection failed, did you forget "
774 "to 'continue' VPP from within gdb?",
780 last_line = cls.vapi.cli("show thread").split("\n")[-2]
781 cls.vpp_worker_count = int(last_line.split(" ")[0])
782 print("Detected VPP with %s workers." % cls.vpp_worker_count)
783 except vpp_papi.VPPRuntimeError as e:
784 cls.logger.debug("%s" % e)
787 except Exception as e:
788 cls.logger.debug("Exception connecting to VPP: %s" % e)
793 def _debug_quit(cls):
794 if cls.debug_gdbserver or cls.debug_gdb:
798 if cls.vpp.returncode is None:
800 print(double_line_delim)
801 print("VPP or GDB server is still running")
802 print(single_line_delim)
804 "When done debugging, press ENTER to kill the "
805 "process and finish running the testcase..."
807 except AttributeError:
813 Disconnect vpp-api, kill vpp and cleanup shared memory files
816 if hasattr(cls, "running_vpp"):
819 # first signal that we want to stop the pump thread, then wake it up
820 if hasattr(cls, "pump_thread_stop_flag"):
821 cls.pump_thread_stop_flag.set()
822 if hasattr(cls, "pump_thread_wakeup_pipe"):
823 os.write(cls.pump_thread_wakeup_pipe[1], b"ding dong wake up")
824 if hasattr(cls, "pump_thread"):
825 cls.logger.debug("Waiting for pump thread to stop")
826 cls.pump_thread.join()
827 if hasattr(cls, "vpp_stderr_reader_thread"):
828 cls.logger.debug("Waiting for stderr pump to stop")
829 cls.vpp_stderr_reader_thread.join()
831 if hasattr(cls, "vpp"):
832 if hasattr(cls, "vapi"):
833 cls.logger.debug(cls.vapi.vpp.get_stats())
834 cls.logger.debug("Disconnecting class vapi client on %s", cls.__name__)
835 cls.vapi.disconnect()
836 cls.logger.debug("Deleting class vapi attribute on %s", cls.__name__)
839 if not cls.debug_attach and cls.vpp.returncode is None:
840 cls.wait_for_coredump()
841 cls.logger.debug("Sending TERM to vpp")
843 cls.logger.debug("Waiting for vpp to die")
845 outs, errs = cls.vpp.communicate(timeout=5)
846 except subprocess.TimeoutExpired:
848 outs, errs = cls.vpp.communicate()
849 cls.logger.debug("Deleting class vpp attribute on %s", cls.__name__)
850 if not cls.debug_attach and not hasattr(cls, "running_vpp"):
851 cls.vpp.stdout.close()
852 cls.vpp.stderr.close()
853 # If vpp is a dynamic attribute set by the func use_running,
854 # deletion will result in an AttributeError that we can
858 except AttributeError:
861 if cls.vpp_startup_failed:
862 stdout_log = cls.logger.info
863 stderr_log = cls.logger.critical
865 stdout_log = cls.logger.info
866 stderr_log = cls.logger.info
868 if hasattr(cls, "vpp_stdout_deque"):
869 stdout_log(single_line_delim)
870 stdout_log("VPP output to stdout while running %s:", cls.__name__)
871 stdout_log(single_line_delim)
872 vpp_output = "".join(cls.vpp_stdout_deque)
873 with open(cls.tempdir + "/vpp_stdout.txt", "w") as f:
875 stdout_log("\n%s", vpp_output)
876 stdout_log(single_line_delim)
878 if hasattr(cls, "vpp_stderr_deque"):
879 stderr_log(single_line_delim)
880 stderr_log("VPP output to stderr while running %s:", cls.__name__)
881 stderr_log(single_line_delim)
882 vpp_output = "".join(cls.vpp_stderr_deque)
883 with open(cls.tempdir + "/vpp_stderr.txt", "w") as f:
885 stderr_log("\n%s", vpp_output)
886 stderr_log(single_line_delim)
889 def tearDownClass(cls):
890 """Perform final cleanup after running all tests in this test-case"""
891 cls.logger.debug("--- tearDownClass() for %s called ---" % cls.__name__)
892 if not hasattr(cls, "vpp"):
894 cls.reporter.send_keep_alive(cls, "tearDownClass")
896 cls.file_handler.close()
897 cls.reset_packet_infos()
898 if config.debug_framework:
899 debug_internal.on_tear_down_class(cls)
901 def show_commands_at_teardown(self):
902 """Allow subclass specific teardown logging additions."""
903 self.logger.info("--- No test specific show commands provided. ---")
906 """Show various debug prints after each test"""
908 "--- tearDown() for %s.%s(%s) called ---"
909 % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
911 if not hasattr(self, "vpp"):
915 if not self.vpp_dead:
916 self.logger.debug(self.vapi.cli("show trace max 1000"))
917 self.logger.info(self.vapi.ppcli("show interface"))
918 self.logger.info(self.vapi.ppcli("show hardware"))
919 self.logger.info(self.statistics.set_errors_str())
920 self.logger.info(self.vapi.ppcli("show run"))
921 self.logger.info(self.vapi.ppcli("show log"))
922 self.logger.info(self.vapi.ppcli("show bihash"))
923 self.logger.info("Logging testcase specific show commands.")
924 self.show_commands_at_teardown()
925 if self.remove_configured_vpp_objects_on_tear_down:
926 self.registry.remove_vpp_config(self.logger)
927 # Save/Dump VPP api trace log
928 m = self._testMethodName
929 api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
930 tmp_api_trace = "/tmp/%s" % api_trace
931 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
932 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
933 self.logger.info("Moving %s to %s\n" % (tmp_api_trace, vpp_api_trace_log))
934 shutil.move(tmp_api_trace, vpp_api_trace_log)
935 except VppTransportSocketIOError:
937 "VppTransportSocketIOError: Vpp dead. Cannot log show commands."
941 self.registry.unregister_all(self.logger)
944 """Clear trace before running each test"""
945 super(VppTestCase, self).setUp()
946 if not hasattr(self, "vpp"):
948 self.reporter.send_keep_alive(self)
952 testcase=self.__class__.__name__,
953 method_name=self._testMethodName,
955 self.sleep(0.1, "during setUp")
956 self.vpp_stdout_deque.append(
957 "--- test setUp() for %s.%s(%s) starts here ---\n"
958 % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
960 self.vpp_stderr_deque.append(
961 "--- test setUp() for %s.%s(%s) starts here ---\n"
962 % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
964 self.vapi.cli("clear trace")
965 # store the test instance inside the test class - so that objects
966 # holding the class can access instance methods (like assertEqual)
967 type(self).test_instance = self
970 def pg_enable_capture(cls, interfaces=None):
972 Enable capture on packet-generator interfaces
974 :param interfaces: iterable interface indexes (if None,
975 use self.pg_interfaces)
978 if interfaces is None:
979 interfaces = cls.pg_interfaces
984 def register_pcap(cls, intf, worker):
985 """Register a pcap in the testclass"""
986 # add to the list of captures with current timestamp
987 cls._pcaps.append((intf, worker))
990 def get_vpp_time(cls):
991 # processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT"
992 # returns float("2.190522")
993 timestr = cls.vapi.cli("show clock")
994 head, sep, tail = timestr.partition(",")
995 head, sep, tail = head.partition("Time now")
999 def sleep_on_vpp_time(cls, sec):
1000 """Sleep according to time in VPP world"""
1001 # On a busy system with many processes
1002 # we might end up with VPP time being slower than real world
1003 # So take that into account when waiting for VPP to do something
1004 start_time = cls.get_vpp_time()
1005 while cls.get_vpp_time() - start_time < sec:
1009 def pg_start(cls, trace=True):
1010 """Enable the PG, wait till it is done, then clean up"""
1011 for (intf, worker) in cls._old_pcaps:
1012 intf.handle_old_pcap_file(intf.get_in_path(worker), intf.in_history_counter)
1015 cls.vapi.cli("clear trace")
1016 cls.vapi.cli("trace add pg-input 1000")
1017 cls.vapi.cli("packet-generator enable")
1018 # PG, when starts, runs to completion -
1019 # so let's avoid a race condition,
1020 # and wait a little till it's done.
1021 # Then clean it up - and then be gone.
1022 deadline = time.time() + 300
1023 while cls.vapi.cli("show packet-generator").find("Yes") != -1:
1024 cls.sleep(0.01) # yield
1025 if time.time() > deadline:
1026 cls.logger.error("Timeout waiting for pg to stop")
1028 for intf, worker in cls._pcaps:
1029 cls.vapi.cli("packet-generator delete %s" % intf.get_cap_name(worker))
1030 cls._old_pcaps = cls._pcaps
1034 def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0, mode=None):
1036 Create packet-generator interfaces.
1038 :param interfaces: iterable indexes of the interfaces.
1039 :returns: List of created interfaces.
1043 for i in interfaces:
1044 intf = VppPGInterface(cls, i, gso, gso_size, mode)
1045 setattr(cls, intf.name, intf)
1047 cls.pg_interfaces = result
1051 def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
1052 if not hasattr(cls, "vpp"):
1053 cls.pg_interfaces = []
1054 return cls.pg_interfaces
1055 pgmode = VppEnum.vl_api_pg_interface_mode_t
1056 return cls.create_pg_interfaces_internal(
1057 interfaces, gso, gso_size, pgmode.PG_API_MODE_IP4
1061 def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
1062 if not hasattr(cls, "vpp"):
1063 cls.pg_interfaces = []
1064 return cls.pg_interfaces
1065 pgmode = VppEnum.vl_api_pg_interface_mode_t
1066 return cls.create_pg_interfaces_internal(
1067 interfaces, gso, gso_size, pgmode.PG_API_MODE_IP6
1071 def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
1072 if not hasattr(cls, "vpp"):
1073 cls.pg_interfaces = []
1074 return cls.pg_interfaces
1075 pgmode = VppEnum.vl_api_pg_interface_mode_t
1076 return cls.create_pg_interfaces_internal(
1077 interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
1081 def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
1082 if not hasattr(cls, "vpp"):
1083 cls.pg_interfaces = []
1084 return cls.pg_interfaces
1085 pgmode = VppEnum.vl_api_pg_interface_mode_t
1086 return cls.create_pg_interfaces_internal(
1087 interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
1091 def create_loopback_interfaces(cls, count):
1093 Create loopback interfaces.
1095 :param count: number of interfaces created.
1096 :returns: List of created interfaces.
1098 if not hasattr(cls, "vpp"):
1099 cls.lo_interfaces = []
1100 return cls.lo_interfaces
1101 result = [VppLoInterface(cls) for i in range(count)]
1103 setattr(cls, intf.name, intf)
1104 cls.lo_interfaces = result
1108 def create_bvi_interfaces(cls, count):
1110 Create BVI interfaces.
1112 :param count: number of interfaces created.
1113 :returns: List of created interfaces.
1115 if not hasattr(cls, "vpp"):
1116 cls.bvi_interfaces = []
1117 return cls.bvi_interfaces
1118 result = [VppBviInterface(cls) for i in range(count)]
1120 setattr(cls, intf.name, intf)
1121 cls.bvi_interfaces = result
1125 def extend_packet(packet, size, padding=" "):
1127 Extend packet to given size by padding with spaces or custom padding
1128 NOTE: Currently works only when Raw layer is present.
1130 :param packet: packet
1131 :param size: target size
1132 :param padding: padding used to extend the payload
1135 packet_len = len(packet) + 4
1136 extend = size - packet_len
1138 num = (extend // len(padding)) + 1
1139 packet[Raw].load += (padding * num)[:extend].encode("ascii")
1142 def reset_packet_infos(cls):
1143 """Reset the list of packet info objects and packet counts to zero"""
1144 cls._packet_infos = {}
1145 cls._packet_count_for_dst_if_idx = {}
1148 def create_packet_info(cls, src_if, dst_if):
1150 Create packet info object containing the source and destination indexes
1151 and add it to the testcase's packet info list
1153 :param VppInterface src_if: source interface
1154 :param VppInterface dst_if: destination interface
1156 :returns: _PacketInfo object
1159 info = _PacketInfo()
1160 info.index = len(cls._packet_infos)
1161 info.src = src_if.sw_if_index
1162 info.dst = dst_if.sw_if_index
1163 if isinstance(dst_if, VppSubInterface):
1164 dst_idx = dst_if.parent.sw_if_index
1166 dst_idx = dst_if.sw_if_index
1167 if dst_idx in cls._packet_count_for_dst_if_idx:
1168 cls._packet_count_for_dst_if_idx[dst_idx] += 1
1170 cls._packet_count_for_dst_if_idx[dst_idx] = 1
1171 cls._packet_infos[info.index] = info
1175 def info_to_payload(info):
1177 Convert _PacketInfo object to packet payload
1179 :param info: _PacketInfo object
1181 :returns: string containing serialized data from packet info
1184 # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1185 return pack("iiiih", info.index, info.src, info.dst, info.ip, info.proto)
1188 def payload_to_info(payload, payload_field="load"):
1190 Convert packet payload to _PacketInfo object
1192 :param payload: packet payload
1193 :type payload: <class 'scapy.packet.Raw'>
1194 :param payload_field: packet fieldname of payload "load" for
1195 <class 'scapy.packet.Raw'>
1196 :type payload_field: str
1197 :returns: _PacketInfo object containing de-serialized data from payload
1201 # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1202 payload_b = getattr(payload, payload_field)[:18]
1204 info = _PacketInfo()
1205 info.index, info.src, info.dst, info.ip, info.proto = unpack("iiiih", payload_b)
1207 # some SRv6 TCs depend on get an exception if bad values are detected
1208 if info.index > 0x4000:
1209 raise ValueError("Index value is invalid")
1213 def get_next_packet_info(self, info):
1215 Iterate over the packet info list stored in the testcase
1216 Start iteration with first element if info is None
1217 Continue based on index in info if info is specified
1219 :param info: info or None
1220 :returns: next info in list or None if no more infos
1225 next_index = info.index + 1
1226 if next_index == len(self._packet_infos):
1229 return self._packet_infos[next_index]
1231 def get_next_packet_info_for_interface(self, src_index, info):
1233 Search the packet info list for the next packet info with same source
1236 :param src_index: source interface index to search for
1237 :param info: packet info - where to start the search
1238 :returns: packet info or None
1242 info = self.get_next_packet_info(info)
1245 if info.src == src_index:
1248 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
1250 Search the packet info list for the next packet info with same source
1251 and destination interface indexes
1253 :param src_index: source interface index to search for
1254 :param dst_index: destination interface index to search for
1255 :param info: packet info - where to start the search
1256 :returns: packet info or None
1260 info = self.get_next_packet_info_for_interface(src_index, info)
1263 if info.dst == dst_index:
1266 def assert_equal(self, real_value, expected_value, name_or_class=None):
1267 if name_or_class is None:
1268 self.assertEqual(real_value, expected_value)
1271 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
1273 getdoc(name_or_class).strip(),
1275 str(name_or_class(real_value)),
1277 str(name_or_class(expected_value)),
1280 msg = "Invalid %s: %s does not match expected value %s" % (
1286 self.assertEqual(real_value, expected_value, msg)
1288 def assert_in_range(self, real_value, expected_min, expected_max, name=None):
1292 msg = "Invalid %s: %s out of range <%s,%s>" % (
1298 self.assertTrue(expected_min <= real_value <= expected_max, msg)
1300 def assert_packet_checksums_valid(self, packet, ignore_zero_udp_checksums=True):
1301 received = packet.__class__(scapy.compat.raw(packet))
1302 udp_layers = ["UDP", "UDPerror"]
1303 checksum_fields = ["cksum", "chksum"]
1306 temp = received.__class__(scapy.compat.raw(received))
1308 layer = temp.getlayer(counter)
1310 layer = layer.copy()
1311 layer.remove_payload()
1312 for cf in checksum_fields:
1313 if hasattr(layer, cf):
1315 ignore_zero_udp_checksums
1316 and 0 == getattr(layer, cf)
1317 and layer.name in udp_layers
1320 delattr(temp.getlayer(counter), cf)
1321 checksums.append((counter, cf))
1324 counter = counter + 1
1325 if 0 == len(checksums):
1327 temp = temp.__class__(scapy.compat.raw(temp))
1328 for layer, cf in reversed(checksums):
1329 calc_sum = getattr(temp[layer], cf)
1331 getattr(received[layer], cf),
1333 "packet checksum on layer #%d: %s" % (layer, temp[layer].name),
1336 "Checksum field `%s` on `%s` layer has correct value `%s`"
1337 % (cf, temp[layer].name, calc_sum)
1340 def assert_checksum_valid(
1344 checksum_field_names=["chksum", "cksum"],
1345 ignore_zero_checksum=False,
1347 """Check checksum of received packet on given layer"""
1348 layer_copy = received_packet[layer].copy()
1349 layer_copy.remove_payload()
1351 for f in checksum_field_names:
1352 if hasattr(layer_copy, f):
1355 if field_name is None:
1357 f"Layer `{layer}` has none of checksum fields: `{checksum_field_names}`."
1359 received_packet_checksum = getattr(received_packet[layer], field_name)
1360 if ignore_zero_checksum and 0 == received_packet_checksum:
1362 recalculated = received_packet.__class__(scapy.compat.raw(received_packet))
1363 delattr(recalculated[layer], field_name)
1364 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1366 received_packet_checksum,
1367 getattr(recalculated[layer], field_name),
1368 f"packet checksum (field: {field_name}) on layer: %s" % layer,
1371 def assert_ip_checksum_valid(self, received_packet, ignore_zero_checksum=False):
1372 self.assert_checksum_valid(
1373 received_packet, "IP", ignore_zero_checksum=ignore_zero_checksum
1376 def assert_tcp_checksum_valid(self, received_packet, ignore_zero_checksum=False):
1377 self.assert_checksum_valid(
1378 received_packet, "TCP", ignore_zero_checksum=ignore_zero_checksum
1381 def assert_udp_checksum_valid(self, received_packet, ignore_zero_checksum=True):
1382 self.assert_checksum_valid(
1383 received_packet, "UDP", ignore_zero_checksum=ignore_zero_checksum
1386 def assert_embedded_icmp_checksum_valid(self, received_packet):
1387 if received_packet.haslayer(IPerror):
1388 self.assert_checksum_valid(received_packet, "IPerror")
1389 if received_packet.haslayer(TCPerror):
1390 self.assert_checksum_valid(received_packet, "TCPerror")
1391 if received_packet.haslayer(UDPerror):
1392 self.assert_checksum_valid(
1393 received_packet, "UDPerror", ignore_zero_checksum=True
1395 if received_packet.haslayer(ICMPerror):
1396 self.assert_checksum_valid(received_packet, "ICMPerror")
1398 def assert_icmp_checksum_valid(self, received_packet):
1399 self.assert_checksum_valid(received_packet, "ICMP")
1400 self.assert_embedded_icmp_checksum_valid(received_packet)
1402 def assert_icmpv6_checksum_valid(self, pkt):
1403 if pkt.haslayer(ICMPv6DestUnreach):
1404 self.assert_checksum_valid(pkt, "ICMPv6DestUnreach")
1405 self.assert_embedded_icmp_checksum_valid(pkt)
1406 if pkt.haslayer(ICMPv6EchoRequest):
1407 self.assert_checksum_valid(pkt, "ICMPv6EchoRequest")
1408 if pkt.haslayer(ICMPv6EchoReply):
1409 self.assert_checksum_valid(pkt, "ICMPv6EchoReply")
1411 def get_counter(self, counter):
1412 if counter.startswith("/"):
1413 counter_value = self.statistics.get_counter(counter)
1415 counters = self.vapi.cli("sh errors").split("\n")
1417 for i in range(1, len(counters) - 1):
1418 results = counters[i].split()
1419 if results[1] == counter:
1420 counter_value = int(results[0])
1422 return counter_value
1424 def assert_counter_equal(self, counter, expected_value, thread=None, index=0):
1425 c = self.get_counter(counter)
1426 if thread is not None:
1427 c = c[thread][index]
1429 c = sum(x[index] for x in c)
1430 self.assert_equal(c, expected_value, "counter `%s'" % counter)
1432 def assert_packet_counter_equal(self, counter, expected_value):
1433 counter_value = self.get_counter(counter)
1435 counter_value, expected_value, "packet counter `%s'" % counter
1438 def assert_error_counter_equal(self, counter, expected_value):
1439 counter_value = self.statistics[counter].sum()
1440 self.assert_equal(counter_value, expected_value, "error counter `%s'" % counter)
1443 def sleep(cls, timeout, remark=None):
1445 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1446 # * by Guido, only the main thread can be interrupted.
1448 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1451 if hasattr(os, "sched_yield"):
1457 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1458 before = time.time()
1461 if after - before > 2 * timeout:
1463 "unexpected self.sleep() result - slept for %es instead of ~%es!",
1469 "Finished sleep (%s) - slept %es (wanted %es)",
1475 def virtual_sleep(self, timeout, remark=None):
1476 self.logger.debug("Moving VPP time by %s (%s)", timeout, remark)
1477 self.vapi.cli("set clock adjust %s" % timeout)
1479 def pg_send(self, intf, pkts, worker=None, trace=True):
1480 intf.add_stream(pkts, worker=worker)
1481 self.pg_enable_capture(self.pg_interfaces)
1482 self.pg_start(trace=trace)
1484 def snapshot_stats(self, stats_diff):
1485 """Return snapshot of interesting stats based on diff dictionary."""
1487 for sw_if_index in stats_diff:
1488 for counter in stats_diff[sw_if_index]:
1489 stats_snapshot[counter] = self.statistics[counter]
1490 self.logger.debug(f"Took statistics stats_snapshot: {stats_snapshot}")
1491 return stats_snapshot
1493 def compare_stats_with_snapshot(self, stats_diff, stats_snapshot):
1494 """Assert appropriate difference between current stats and snapshot."""
1495 for sw_if_index in stats_diff:
1496 for cntr, diff in stats_diff[sw_if_index].items():
1497 if sw_if_index == "err":
1499 self.statistics[cntr].sum(),
1500 stats_snapshot[cntr].sum() + diff,
1501 f"'{cntr}' counter value (previous value: "
1502 f"{stats_snapshot[cntr].sum()}, "
1503 f"expected diff: {diff})",
1508 self.statistics[cntr][:, sw_if_index].sum(),
1509 stats_snapshot[cntr][:, sw_if_index].sum() + diff,
1510 f"'{cntr}' counter value (previous value: "
1511 f"{stats_snapshot[cntr][:, sw_if_index].sum()}, "
1512 f"expected diff: {diff})",
1515 # if diff is 0, then this most probably a case where
1516 # test declares multiple interfaces but traffic hasn't
1517 # passed through this one yet - which means the counter
1518 # value is 0 and can be ignored
1522 def send_and_assert_no_replies(
1523 self, intf, pkts, remark="", timeout=None, stats_diff=None, trace=True, msg=None
1526 stats_snapshot = self.snapshot_stats(stats_diff)
1528 self.pg_send(intf, pkts)
1533 for i in self.pg_interfaces:
1534 i.assert_nothing_captured(timeout=timeout, remark=remark)
1539 self.logger.debug(f"send_and_assert_no_replies: {msg}")
1540 self.logger.debug(self.vapi.cli("show trace"))
1543 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1545 def send_and_expect(
1557 stats_snapshot = self.snapshot_stats(stats_diff)
1560 n_rx = 1 if isinstance(pkts, Packet) else len(pkts)
1561 self.pg_send(intf, pkts, worker=worker, trace=trace)
1562 rx = output.get_capture(n_rx)
1565 self.logger.debug(f"send_and_expect: {msg}")
1566 self.logger.debug(self.vapi.cli("show trace"))
1569 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1573 def send_and_expect_load_balancing(
1574 self, input, pkts, outputs, worker=None, trace=True
1576 self.pg_send(input, pkts, worker=worker, trace=trace)
1579 rx = oo._get_capture(1)
1580 self.assertNotEqual(0, len(rx))
1583 self.logger.debug(self.vapi.cli("show trace"))
1586 def send_and_expect_some(self, intf, pkts, output, worker=None, trace=True):
1587 self.pg_send(intf, pkts, worker=worker, trace=trace)
1588 rx = output._get_capture(1)
1590 self.logger.debug(self.vapi.cli("show trace"))
1591 self.assertTrue(len(rx) > 0)
1592 self.assertTrue(len(rx) < len(pkts))
1595 def send_and_expect_only(self, intf, pkts, output, timeout=None, stats_diff=None):
1597 stats_snapshot = self.snapshot_stats(stats_diff)
1599 self.pg_send(intf, pkts)
1600 rx = output.get_capture(len(pkts))
1604 for i in self.pg_interfaces:
1605 if i not in outputs:
1606 i.assert_nothing_captured(timeout=timeout)
1610 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1615 def get_testcase_doc_name(test):
1616 return getdoc(test.__class__).splitlines()[0]
1619 def get_test_description(descriptions, test):
1620 short_description = test.shortDescription()
1621 if descriptions and short_description:
1622 return short_description
1627 class TestCaseInfo(object):
1628 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1629 self.logger = logger
1630 self.tempdir = tempdir
1631 self.vpp_pid = vpp_pid
1632 self.vpp_bin_path = vpp_bin_path
1633 self.core_crash_test = None
1636 class VppTestResult(unittest.TestResult):
1638 @property result_string
1639 String variable to store the test case result string.
1641 List variable containing 2-tuples of TestCase instances and strings
1642 holding formatted tracebacks. Each tuple represents a test which
1643 raised an unexpected exception.
1645 List variable containing 2-tuples of TestCase instances and strings
1646 holding formatted tracebacks. Each tuple represents a test where
1647 a failure was explicitly signalled using the TestCase.assert*()
1651 failed_test_cases_info = set()
1652 core_crash_test_cases_info = set()
1653 current_test_case_info = None
1655 def __init__(self, stream=None, descriptions=None, verbosity=None, runner=None):
1657 :param stream File descriptor to store where to report test results.
1658 Set to the standard error stream by default.
1659 :param descriptions Boolean variable to store information if to use
1660 test case descriptions.
1661 :param verbosity Integer variable to store required verbosity level.
1663 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1664 self.stream = stream
1665 self.descriptions = descriptions
1666 self.verbosity = verbosity
1667 self.result_string = None
1668 self.runner = runner
1671 def addSuccess(self, test):
1673 Record a test succeeded result
1678 if self.current_test_case_info:
1679 self.current_test_case_info.logger.debug(
1680 "--- addSuccess() %s.%s(%s) called"
1681 % (test.__class__.__name__, test._testMethodName, test._testMethodDoc)
1683 unittest.TestResult.addSuccess(self, test)
1684 self.result_string = colorize("OK", GREEN)
1686 self.send_result_through_pipe(test, PASS)
1688 def addSkip(self, test, reason):
1690 Record a test skipped.
1696 if self.current_test_case_info:
1697 self.current_test_case_info.logger.debug(
1698 "--- addSkip() %s.%s(%s) called, reason is %s"
1700 test.__class__.__name__,
1701 test._testMethodName,
1702 test._testMethodDoc,
1706 unittest.TestResult.addSkip(self, test, reason)
1707 self.result_string = colorize("SKIP", YELLOW)
1709 if reason == "not enough cpus":
1710 self.send_result_through_pipe(test, SKIP_CPU_SHORTAGE)
1712 self.send_result_through_pipe(test, SKIP)
1714 def symlink_failed(self):
1715 if self.current_test_case_info:
1717 failed_dir = config.failed_dir
1718 link_path = os.path.join(
1720 "%s-FAILED" % os.path.basename(self.current_test_case_info.tempdir),
1723 self.current_test_case_info.logger.debug(
1724 "creating a link to the failed test"
1726 self.current_test_case_info.logger.debug(
1727 "os.symlink(%s, %s)"
1728 % (self.current_test_case_info.tempdir, link_path)
1730 if os.path.exists(link_path):
1731 self.current_test_case_info.logger.debug("symlink already exists")
1733 os.symlink(self.current_test_case_info.tempdir, link_path)
1735 except Exception as e:
1736 self.current_test_case_info.logger.error(e)
1738 def send_result_through_pipe(self, test, result):
1739 if hasattr(self, "test_framework_result_pipe"):
1740 pipe = self.test_framework_result_pipe
1742 pipe.send((test.id(), result))
1744 def log_error(self, test, err, fn_name):
1745 if self.current_test_case_info:
1746 if isinstance(test, unittest.suite._ErrorHolder):
1747 test_name = test.description
1749 test_name = "%s.%s(%s)" % (
1750 test.__class__.__name__,
1751 test._testMethodName,
1752 test._testMethodDoc,
1754 self.current_test_case_info.logger.debug(
1755 "--- %s() %s called, err is %s" % (fn_name, test_name, err)
1757 self.current_test_case_info.logger.debug(
1758 "formatted exception is:\n%s" % "".join(format_exception(*err))
1761 def add_error(self, test, err, unittest_fn, error_type):
1762 if error_type == FAIL:
1763 self.log_error(test, err, "addFailure")
1764 error_type_str = colorize("FAIL", RED)
1765 elif error_type == ERROR:
1766 self.log_error(test, err, "addError")
1767 error_type_str = colorize("ERROR", RED)
1770 "Error type %s cannot be used to record an "
1771 "error or a failure" % error_type
1774 unittest_fn(self, test, err)
1775 if self.current_test_case_info:
1776 self.result_string = "%s [ temp dir used by test case: %s ]" % (
1778 self.current_test_case_info.tempdir,
1780 self.symlink_failed()
1781 self.failed_test_cases_info.add(self.current_test_case_info)
1782 if is_core_present(self.current_test_case_info.tempdir):
1783 if not self.current_test_case_info.core_crash_test:
1784 if isinstance(test, unittest.suite._ErrorHolder):
1785 test_name = str(test)
1787 test_name = "'{!s}' ({!s})".format(
1788 get_testcase_doc_name(test), test.id()
1790 self.current_test_case_info.core_crash_test = test_name
1791 self.core_crash_test_cases_info.add(self.current_test_case_info)
1793 self.result_string = "%s [no temp dir]" % error_type_str
1795 self.send_result_through_pipe(test, error_type)
1797 def addFailure(self, test, err):
1799 Record a test failed result
1802 :param err: error message
1805 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1807 def addError(self, test, err):
1809 Record a test error result
1812 :param err: error message
1815 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1817 def getDescription(self, test):
1819 Get test description
1822 :returns: test description
1825 return get_test_description(self.descriptions, test)
1827 def startTest(self, test):
1835 def print_header(test):
1836 if test.__class__ in self.printed:
1839 test_doc = getdoc(test)
1841 raise Exception("No doc string for test '%s'" % test.id())
1843 test_title = test_doc.splitlines()[0].rstrip()
1844 test_title = colorize(test_title, GREEN)
1845 if test.is_tagged_run_solo():
1846 test_title = colorize(f"SOLO RUN: {test_title}", YELLOW)
1848 # This block may overwrite the colorized title above,
1849 # but we want this to stand out and be fixed
1850 if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
1851 test_title = colorize(f"FIXME with VPP workers: {test_title}", RED)
1853 if test.has_tag(TestCaseTag.FIXME_ASAN):
1854 test_title = colorize(f"FIXME with ASAN: {test_title}", RED)
1855 test.skip_fixme_asan()
1857 if is_distro_ubuntu2204 == True and test.has_tag(
1858 TestCaseTag.FIXME_UBUNTU2204
1860 test_title = colorize(f"FIXME on Ubuntu-22.04: {test_title}", RED)
1861 test.skip_fixme_ubuntu2204()
1863 if is_distro_debian11 == True and test.has_tag(TestCaseTag.FIXME_DEBIAN11):
1864 test_title = colorize(f"FIXME on Debian-11: {test_title}", RED)
1865 test.skip_fixme_debian11()
1867 if "debug" in config.vpp_tag and test.has_tag(TestCaseTag.FIXME_VPP_DEBUG):
1868 test_title = colorize(f"FIXME on VPP Debug: {test_title}", RED)
1869 test.skip_fixme_vpp_debug()
1871 if hasattr(test, "vpp_worker_count"):
1872 if test.vpp_worker_count == 0:
1873 test_title += " [main thread only]"
1874 elif test.vpp_worker_count == 1:
1875 test_title += " [1 worker thread]"
1877 test_title += f" [{test.vpp_worker_count} worker threads]"
1879 if test.__class__.skipped_due_to_cpu_lack:
1880 test_title = colorize(
1881 f"{test_title} [skipped - not enough cpus, "
1882 f"required={test.__class__.get_cpus_required()}, "
1883 f"available={max_vpp_cpus}]",
1887 print(double_line_delim)
1889 print(double_line_delim)
1890 self.printed.append(test.__class__)
1893 self.start_test = time.time()
1894 unittest.TestResult.startTest(self, test)
1895 if self.verbosity > 0:
1896 self.stream.writeln("Starting " + self.getDescription(test) + " ...")
1897 self.stream.writeln(single_line_delim)
1899 def stopTest(self, test):
1901 Called when the given test has been run
1906 unittest.TestResult.stopTest(self, test)
1908 if self.verbosity > 0:
1909 self.stream.writeln(single_line_delim)
1910 self.stream.writeln(
1911 "%-73s%s" % (self.getDescription(test), self.result_string)
1913 self.stream.writeln(single_line_delim)
1915 self.stream.writeln(
1918 self.getDescription(test),
1919 time.time() - self.start_test,
1924 self.send_result_through_pipe(test, TEST_RUN)
1926 def printErrors(self):
1928 Print errors from running the test case
1930 if len(self.errors) > 0 or len(self.failures) > 0:
1931 self.stream.writeln()
1932 self.printErrorList("ERROR", self.errors)
1933 self.printErrorList("FAIL", self.failures)
1935 # ^^ that is the last output from unittest before summary
1936 if not self.runner.print_summary:
1937 devnull = unittest.runner._WritelnDecorator(open(os.devnull, "w"))
1938 self.stream = devnull
1939 self.runner.stream = devnull
1941 def printErrorList(self, flavour, errors):
1943 Print error list to the output stream together with error type
1944 and test case description.
1946 :param flavour: error type
1947 :param errors: iterable errors
1950 for test, err in errors:
1951 self.stream.writeln(double_line_delim)
1952 self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
1953 self.stream.writeln(single_line_delim)
1954 self.stream.writeln("%s" % err)
1957 class VppTestRunner(unittest.TextTestRunner):
1959 A basic test runner implementation which prints results to standard error.
1963 def resultclass(self):
1964 """Class maintaining the results of the tests"""
1965 return VppTestResult
1969 keep_alive_pipe=None,
1979 # ignore stream setting here, use hard-coded stdout to be in sync
1980 # with prints from VppTestCase methods ...
1981 super(VppTestRunner, self).__init__(
1982 sys.stdout, descriptions, verbosity, failfast, buffer, resultclass, **kwargs
1984 KeepAliveReporter.pipe = keep_alive_pipe
1986 self.orig_stream = self.stream
1987 self.resultclass.test_framework_result_pipe = result_pipe
1989 self.print_summary = print_summary
1991 def _makeResult(self):
1992 return self.resultclass(self.stream, self.descriptions, self.verbosity, self)
1994 def run(self, test):
2001 faulthandler.enable() # emit stack trace to stderr if killed by signal
2003 result = super(VppTestRunner, self).run(test)
2004 if not self.print_summary:
2005 self.stream = self.orig_stream
2006 result.stream = self.orig_stream
2010 class Worker(Thread):
2011 def __init__(self, executable_args, logger, env=None, *args, **kwargs):
2012 super(Worker, self).__init__(*args, **kwargs)
2013 self.logger = logger
2014 self.args = executable_args
2015 if hasattr(self, "testcase") and self.testcase.debug_all:
2016 if self.testcase.debug_gdbserver:
2018 "/usr/bin/gdbserver",
2019 "localhost:{port}".format(port=self.testcase.gdbserver_port),
2021 elif self.testcase.debug_gdb and hasattr(self, "wait_for_gdb"):
2022 self.args.append(self.wait_for_gdb)
2023 self.app_bin = executable_args[0]
2024 self.app_name = os.path.basename(self.app_bin)
2025 if hasattr(self, "role"):
2026 self.app_name += " {role}".format(role=self.role)
2029 env = {} if env is None else env
2030 self.env = copy.deepcopy(env)
2032 def wait_for_enter(self):
2033 if not hasattr(self, "testcase"):
2035 if self.testcase.debug_all and self.testcase.debug_gdbserver:
2037 print(double_line_delim)
2039 "Spawned GDB Server for '{app}' with PID: {pid}".format(
2040 app=self.app_name, pid=self.process.pid
2043 elif self.testcase.debug_all and self.testcase.debug_gdb:
2045 print(double_line_delim)
2047 "Spawned '{app}' with PID: {pid}".format(
2048 app=self.app_name, pid=self.process.pid
2053 print(single_line_delim)
2054 print("You can debug '{app}' using:".format(app=self.app_name))
2055 if self.testcase.debug_gdbserver:
2059 + " -ex 'target remote localhost:{port}'".format(
2060 port=self.testcase.gdbserver_port
2064 "Now is the time to attach gdb by running the above "
2065 "command, set up breakpoints etc., then resume from "
2066 "within gdb by issuing the 'continue' command"
2068 self.testcase.gdbserver_port += 1
2069 elif self.testcase.debug_gdb:
2073 + " -ex 'attach {pid}'".format(pid=self.process.pid)
2076 "Now is the time to attach gdb by running the above "
2077 "command and set up breakpoints etc., then resume from"
2078 " within gdb by issuing the 'continue' command"
2080 print(single_line_delim)
2081 input("Press ENTER to continue running the testcase...")
2084 executable = self.args[0]
2085 if not os.path.exists(executable) or not os.access(
2086 executable, os.F_OK | os.X_OK
2088 # Exit code that means some system file did not exist,
2089 # could not be opened, or had some other kind of error.
2090 self.result = os.EX_OSFILE
2091 raise EnvironmentError(
2092 "executable '%s' is not found or executable." % executable
2095 "Running executable '{app}': '{cmd}'".format(
2096 app=self.app_name, cmd=" ".join(self.args)
2099 env = os.environ.copy()
2100 env.update(self.env)
2101 env["CK_LOG_FILE_NAME"] = "-"
2102 self.process = subprocess.Popen(
2103 ["stdbuf", "-o0", "-e0"] + self.args,
2106 preexec_fn=os.setpgrp,
2107 stdout=subprocess.PIPE,
2108 stderr=subprocess.PIPE,
2110 self.wait_for_enter()
2111 out, err = self.process.communicate()
2112 self.logger.debug("Finished running `{app}'".format(app=self.app_name))
2113 self.logger.info("Return code is `%s'" % self.process.returncode)
2114 self.logger.info(single_line_delim)
2116 "Executable `{app}' wrote to stdout:".format(app=self.app_name)
2118 self.logger.info(single_line_delim)
2119 self.logger.info(out.decode("utf-8"))
2120 self.logger.info(single_line_delim)
2122 "Executable `{app}' wrote to stderr:".format(app=self.app_name)
2124 self.logger.info(single_line_delim)
2125 self.logger.info(err.decode("utf-8"))
2126 self.logger.info(single_line_delim)
2127 self.result = self.process.returncode
2130 if __name__ == "__main__":