3 from __future__ import print_function
18 from collections import deque
19 from threading import Thread, Event
20 from inspect import getdoc, isclass
21 from traceback import format_exception
22 from logging import FileHandler, DEBUG, Formatter
24 from abc import ABC, abstractmethod
25 from struct import pack, unpack
28 from scapy.packet import Raw, Packet
29 from config import config, available_cpus, num_cpus, max_vpp_cpus
30 import hook as hookmodule
31 from vpp_pg_interface import VppPGInterface
32 from vpp_sub_interface import VppSubInterface
33 from vpp_lo_interface import VppLoInterface
34 from vpp_bvi_interface import VppBviInterface
35 from vpp_papi_provider import VppPapiProvider
36 from vpp_papi import VppEnum
38 from vpp_papi.vpp_stats import VPPStats
39 from vpp_papi.vpp_transport_socket import VppTransportSocketIOError
49 from vpp_object import VppObjectRegistry
50 from util import ppp, is_core_present
51 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
52 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
53 from scapy.layers.inet6 import ICMPv6EchoReply
54 from vpp_running import use_running
57 logger = logging.getLogger(__name__)
59 # Set up an empty logger for the testcase that can be overridden as necessary
60 null_logger = logging.getLogger("VppTestCase")
61 null_logger.addHandler(logging.NullHandler())
71 if config.debug_framework:
75 Test framework module.
77 The module provides a set of tools for constructing and running tests and
78 representing the results.
82 class VppDiedError(Exception):
83 """exception for reporting that the subprocess has died."""
87 for k, v in signal.__dict__.items()
88 if k.startswith("SIG") and not k.startswith("SIG_")
91 def __init__(self, rv=None, testcase=None, method_name=None):
93 self.signal_name = None
94 self.testcase = testcase
95 self.method_name = method_name
98 self.signal_name = VppDiedError.signals_by_value[-rv]
99 except (KeyError, TypeError):
102 if testcase is None and method_name is None:
105 in_msg = " while running %s.%s" % (testcase, method_name)
108 msg = "VPP subprocess died unexpectedly%s with return code: %d%s." % (
111 " [%s]" % (self.signal_name if self.signal_name is not None else ""),
114 msg = "VPP subprocess died unexpectedly%s." % in_msg
116 super(VppDiedError, self).__init__(msg)
119 class _PacketInfo(object):
120 """Private class to create packet info object.
122 Help process information about the next packet.
123 Set variables to default values.
126 #: Store the index of the packet.
128 #: Store the index of the source packet generator interface of the packet.
130 #: Store the index of the destination packet generator interface
133 #: Store expected ip version
135 #: Store expected upper protocol
137 #: Store the copy of the former packet.
140 def __eq__(self, other):
141 index = self.index == other.index
142 src = self.src == other.src
143 dst = self.dst == other.dst
144 data = self.data == other.data
145 return index and src and dst and data
148 def pump_output(testclass):
149 """pump output from vpp stdout/stderr to proper queues"""
150 if not hasattr(testclass, "vpp"):
154 while not testclass.pump_thread_stop_flag.is_set():
155 readable = select.select(
157 testclass.vpp.stdout.fileno(),
158 testclass.vpp.stderr.fileno(),
159 testclass.pump_thread_wakeup_pipe[0],
164 if testclass.vpp.stdout.fileno() in readable:
165 read = os.read(testclass.vpp.stdout.fileno(), 102400)
167 split = read.decode("ascii", errors="backslashreplace").splitlines(True)
168 if len(stdout_fragment) > 0:
169 split[0] = "%s%s" % (stdout_fragment, split[0])
170 if len(split) > 0 and split[-1].endswith("\n"):
174 stdout_fragment = split[-1]
175 testclass.vpp_stdout_deque.extend(split[:limit])
176 if not config.cache_vpp_output:
177 for line in split[:limit]:
178 testclass.logger.info("VPP STDOUT: %s" % line.rstrip("\n"))
179 if testclass.vpp.stderr.fileno() in readable:
180 read = os.read(testclass.vpp.stderr.fileno(), 102400)
182 split = read.decode("ascii", errors="backslashreplace").splitlines(True)
183 if len(stderr_fragment) > 0:
184 split[0] = "%s%s" % (stderr_fragment, split[0])
185 if len(split) > 0 and split[-1].endswith("\n"):
189 stderr_fragment = split[-1]
191 testclass.vpp_stderr_deque.extend(split[:limit])
192 if not config.cache_vpp_output:
193 for line in split[:limit]:
194 testclass.logger.error("VPP STDERR: %s" % line.rstrip("\n"))
195 # ignoring the dummy pipe here intentionally - the
196 # flag will take care of properly terminating the loop
199 def _is_platform_aarch64():
200 return platform.machine() == "aarch64"
203 is_platform_aarch64 = _is_platform_aarch64()
206 def _is_distro_ubuntu2204():
207 with open("/etc/os-release") as f:
208 for line in f.readlines():
214 is_distro_ubuntu2204 = _is_distro_ubuntu2204()
217 def _is_distro_debian11():
218 with open("/etc/os-release") as f:
219 for line in f.readlines():
220 if "bullseye" in line:
225 is_distro_debian11 = _is_distro_debian11()
228 class KeepAliveReporter(object):
230 Singleton object which reports test start to parent process
236 self.__dict__ = self._shared_state
244 def pipe(self, pipe):
245 if self._pipe is not None:
246 raise Exception("Internal error - pipe should only be set once.")
249 def send_keep_alive(self, test, desc=None):
251 Write current test tmpdir & desc to keep-alive pipe to signal liveness
253 if not hasattr(test, "vpp") or self.pipe is None:
254 # if not running forked..
258 desc = "%s (%s)" % (desc, unittest.util.strclass(test))
262 self.pipe.send((desc, config.vpp, test.tempdir, test.vpp.pid))
265 class TestCaseTag(Enum):
266 # marks the suites that must run at the end
267 # using only a single test runner
269 # marks the suites broken on VPP multi-worker
270 FIXME_VPP_WORKERS = 2
271 # marks the suites broken when ASan is enabled
273 # marks suites broken on Ubuntu-22.04
275 # marks suites broken on Debian-11
277 # marks suites broken on debug vpp image
281 def create_tag_decorator(e):
284 cls.test_tags.append(e)
285 except AttributeError:
292 tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
293 tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
294 tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN)
295 tag_fixme_ubuntu2204 = create_tag_decorator(TestCaseTag.FIXME_UBUNTU2204)
296 tag_fixme_debian11 = create_tag_decorator(TestCaseTag.FIXME_DEBIAN11)
297 tag_fixme_vpp_debug = create_tag_decorator(TestCaseTag.FIXME_VPP_DEBUG)
311 class CPUInterface(ABC):
313 skipped_due_to_cpu_lack = False
317 def get_cpus_required(cls):
321 def assign_cpus(cls, cpus):
326 class VppTestCase(CPUInterface, unittest.TestCase):
327 """This subclass is a base class for VPP test cases that are implemented as
328 classes. It provides methods to create and run test case.
331 extra_vpp_statseg_config = ""
332 extra_vpp_config = []
333 extra_vpp_plugin_config = []
335 vapi_response_timeout = 5
336 remove_configured_vpp_objects_on_tear_down = True
339 def packet_infos(self):
340 """List of packet infos"""
341 return self._packet_infos
344 def get_packet_count_for_if_idx(cls, dst_if_index):
345 """Get the number of packet info for specified destination if index"""
346 if dst_if_index in cls._packet_count_for_dst_if_idx:
347 return cls._packet_count_for_dst_if_idx[dst_if_index]
352 def has_tag(cls, tag):
353 """if the test case has a given tag - return true"""
355 return tag in cls.test_tags
356 except AttributeError:
361 def is_tagged_run_solo(cls):
362 """if the test case class is timing-sensitive - return true"""
363 return cls.has_tag(TestCaseTag.RUN_SOLO)
366 def skip_fixme_asan(cls):
367 """if @tag_fixme_asan & ASan is enabled - mark for skip"""
368 if cls.has_tag(TestCaseTag.FIXME_ASAN):
369 vpp_extra_cmake_args = os.environ.get("VPP_EXTRA_CMAKE_ARGS", "")
370 if "DVPP_ENABLE_SANITIZE_ADDR=ON" in vpp_extra_cmake_args:
371 cls = unittest.skip("Skipping @tag_fixme_asan tests")(cls)
374 def skip_fixme_ubuntu2204(cls):
375 """if distro is ubuntu 22.04 and @tag_fixme_ubuntu2204 mark for skip"""
376 if cls.has_tag(TestCaseTag.FIXME_UBUNTU2204):
377 cls = unittest.skip("Skipping @tag_fixme_ubuntu2204 tests")(cls)
380 def skip_fixme_debian11(cls):
381 """if distro is Debian-11 and @tag_fixme_debian11 mark for skip"""
382 if cls.has_tag(TestCaseTag.FIXME_DEBIAN11):
383 cls = unittest.skip("Skipping @tag_fixme_debian11 tests")(cls)
386 def skip_fixme_vpp_debug(cls):
387 cls = unittest.skip("Skipping @tag_fixme_vpp_debug tests")(cls)
391 """Return the instance of this testcase"""
392 return cls.test_instance
395 def set_debug_flags(cls, d):
396 cls.gdbserver_port = 7777
397 cls.debug_core = False
398 cls.debug_gdb = False
399 cls.debug_gdbserver = False
400 cls.debug_all = False
401 cls.debug_attach = False
406 cls.debug_core = True
407 elif dl == "gdb" or dl == "gdb-all":
409 elif dl == "gdbserver" or dl == "gdbserver-all":
410 cls.debug_gdbserver = True
412 cls.debug_attach = True
414 raise Exception("Unrecognized DEBUG option: '%s'" % d)
415 if dl == "gdb-all" or dl == "gdbserver-all":
419 def get_vpp_worker_count(cls):
420 if not hasattr(cls, "vpp_worker_count"):
421 if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
422 cls.vpp_worker_count = 0
424 cls.vpp_worker_count = config.vpp_worker_count
425 return cls.vpp_worker_count
428 def get_cpus_required(cls):
429 return 1 + cls.get_vpp_worker_count()
432 def setUpConstants(cls):
433 """Set-up the test case class based on environment variables"""
434 cls.step = config.step
435 cls.plugin_path = ":".join(config.vpp_plugin_dir)
436 cls.test_plugin_path = ":".join(config.vpp_test_plugin_dir)
437 cls.extern_plugin_path = ":".join(config.extern_plugin_dir)
439 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
440 debug_cli = "cli-listen localhost:5002"
441 size = re.search(r"\d+[gG]", config.coredump_size)
443 coredump_size = f"coredump-size {config.coredump_size}".lower()
445 coredump_size = "coredump-size unlimited"
446 default_variant = config.variant
447 if default_variant is not None:
448 default_variant = "default { variant %s 100 }" % default_variant
452 api_fuzzing = config.api_fuzz
453 if api_fuzzing is None:
474 cls.get_api_segment_prefix(),
481 if cls.extern_plugin_path not in (None, ""):
482 cls.extra_vpp_plugin_config.append("add-path %s" % cls.extern_plugin_path)
483 if cls.get_vpp_worker_count():
484 cls.vpp_cmdline.extend(
485 ["corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])]
487 cls.vpp_cmdline.extend(
498 cls.get_stats_sock_path(),
499 cls.extra_vpp_statseg_config,
504 cls.get_api_sock_path(),
525 "lisp_unittest_plugin.so",
530 "unittest_plugin.so",
535 + cls.extra_vpp_plugin_config
541 if cls.extra_vpp_config is not None:
542 cls.vpp_cmdline.extend(cls.extra_vpp_config)
544 if not cls.debug_attach:
545 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
546 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
549 def wait_for_enter(cls):
550 if cls.debug_gdbserver:
551 print(double_line_delim)
552 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
554 print(double_line_delim)
555 print("Spawned VPP with PID: %d" % cls.vpp.pid)
557 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
559 print(single_line_delim)
560 print("You can debug VPP using:")
561 if cls.debug_gdbserver:
563 f"sudo gdb {config.vpp} "
564 f"-ex 'target remote localhost:{cls.gdbserver_port}'"
567 "Now is the time to attach gdb by running the above "
568 "command, set up breakpoints etc., then resume VPP from "
569 "within gdb by issuing the 'continue' command"
571 cls.gdbserver_port += 1
573 print(f"sudo gdb {config.vpp} -ex 'attach {cls.vpp.pid}'")
575 "Now is the time to attach gdb by running the above "
576 "command and set up breakpoints etc., then resume VPP from"
577 " within gdb by issuing the 'continue' command"
579 print(single_line_delim)
580 input("Press ENTER to continue running the testcase...")
589 is_distro_ubuntu2204 == True and cls.has_tag(TestCaseTag.FIXME_UBUNTU2204)
590 ) or (is_distro_debian11 == True and cls.has_tag(TestCaseTag.FIXME_DEBIAN11)):
592 cls.logger.debug(f"Assigned cpus: {cls.cpus}")
593 cmdline = cls.vpp_cmdline
595 if cls.debug_gdbserver:
596 gdbserver = "/usr/bin/gdbserver"
597 if not os.path.isfile(gdbserver) or not os.access(gdbserver, os.X_OK):
599 "gdbserver binary '%s' does not exist or is "
600 "not executable" % gdbserver
605 "localhost:{port}".format(port=cls.gdbserver_port),
607 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
610 cls.vpp = subprocess.Popen(
611 cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE
613 except subprocess.CalledProcessError as e:
615 "Subprocess returned with non-0 return code: (%s)", e.returncode
620 "Subprocess returned with OS error: (%s) %s", e.errno, e.strerror
623 except Exception as e:
624 cls.logger.exception("Subprocess returned unexpected from %s:", cmdline)
630 def wait_for_coredump(cls):
631 corefile = cls.tempdir + "/core"
632 if os.path.isfile(corefile):
633 cls.logger.error("Waiting for coredump to complete: %s", corefile)
634 curr_size = os.path.getsize(corefile)
635 deadline = time.time() + 60
637 while time.time() < deadline:
640 curr_size = os.path.getsize(corefile)
641 if size == curr_size:
646 "Timed out waiting for coredump to complete: %s", corefile
649 cls.logger.error("Coredump complete: %s, size %d", corefile, curr_size)
652 def get_stats_sock_path(cls):
653 return "%s/stats.sock" % cls.tempdir
656 def get_api_sock_path(cls):
657 return "%s/api.sock" % cls.tempdir
660 def get_api_segment_prefix(cls):
661 return os.path.basename(cls.tempdir) # Only used for VAPI
664 def get_tempdir(cls):
666 tmpdir = f"{config.tmp_dir}/unittest-attach-gdb"
668 tmpdir = f"{config.tmp_dir}/vpp-unittest-{cls.__name__}"
669 if config.wipe_tmp_dir:
670 shutil.rmtree(tmpdir, ignore_errors=True)
675 def create_file_handler(cls):
676 if config.log_dir is None:
677 cls.file_handler = FileHandler(f"{cls.tempdir}/log.txt")
680 logdir = f"{config.log_dir}/vpp-unittest-{cls.__name__}"
681 if config.wipe_tmp_dir:
682 shutil.rmtree(logdir, ignore_errors=True)
684 cls.file_handler = FileHandler(f"{logdir}/log.txt")
689 Perform class setup before running the testcase
690 Remove shared memory files, start vpp and connect the vpp-api
692 super(VppTestCase, cls).setUpClass()
693 cls.logger = get_logger(cls.__name__)
694 random.seed(config.rnd_seed)
695 if hasattr(cls, "parallel_handler"):
696 cls.logger.addHandler(cls.parallel_handler)
697 cls.logger.propagate = False
698 cls.set_debug_flags(config.debug)
699 cls.tempdir = cls.get_tempdir()
700 cls.create_file_handler()
701 cls.file_handler.setFormatter(
702 Formatter(fmt="%(asctime)s,%(msecs)03d %(message)s", datefmt="%H:%M:%S")
704 cls.file_handler.setLevel(DEBUG)
705 cls.logger.addHandler(cls.file_handler)
706 cls.logger.debug("--- setUpClass() for %s called ---" % cls.__name__)
707 os.chdir(cls.tempdir)
709 "Temporary dir is %s, api socket is %s",
711 cls.get_api_sock_path(),
713 cls.logger.debug("Random seed is %s", config.rnd_seed)
715 cls.reset_packet_infos()
720 cls.registry = VppObjectRegistry()
721 cls.vpp_startup_failed = False
722 cls.reporter = KeepAliveReporter()
723 # need to catch exceptions here because if we raise, then the cleanup
724 # doesn't get called and we might end with a zombie vpp
730 if not hasattr(cls, "vpp"):
732 cls.reporter.send_keep_alive(cls, "setUpClass")
733 VppTestResult.current_test_case_info = TestCaseInfo(
734 cls.logger, cls.tempdir, cls.vpp.pid, config.vpp
736 cls.vpp_stdout_deque = deque()
737 cls.vpp_stderr_deque = deque()
738 # Pump thread in a non-debug-attached & not running-vpp
739 if not cls.debug_attach and not hasattr(cls, "running_vpp"):
740 cls.pump_thread_stop_flag = Event()
741 cls.pump_thread_wakeup_pipe = os.pipe()
742 cls.pump_thread = Thread(target=pump_output, args=(cls,))
743 cls.pump_thread.daemon = True
744 cls.pump_thread.start()
745 if cls.debug_gdb or cls.debug_gdbserver or cls.debug_attach:
746 cls.vapi_response_timeout = 0
747 cls.vapi = VppPapiProvider(cls.__name__, cls, cls.vapi_response_timeout)
749 hook = hookmodule.StepHook(cls)
751 hook = hookmodule.PollHook(cls)
752 cls.vapi.register_hook(hook)
753 cls.statistics = VPPStats(socketname=cls.get_stats_sock_path())
757 cls.vpp_startup_failed = True
759 "VPP died shortly after startup, check the"
760 " output to standard error for possible cause"
765 except (vpp_papi.VPPIOError, Exception) as e:
766 cls.logger.debug("Exception connecting to vapi: %s" % e)
767 cls.vapi.disconnect()
769 if cls.debug_gdbserver:
772 "You're running VPP inside gdbserver but "
773 "VPP-API connection failed, did you forget "
774 "to 'continue' VPP from within gdb?",
780 last_line = cls.vapi.cli("show thread").split("\n")[-2]
781 cls.vpp_worker_count = int(last_line.split(" ")[0])
782 print("Detected VPP with %s workers." % cls.vpp_worker_count)
783 except vpp_papi.VPPRuntimeError as e:
784 cls.logger.debug("%s" % e)
787 except Exception as e:
788 cls.logger.debug("Exception connecting to VPP: %s" % e)
793 def _debug_quit(cls):
794 if cls.debug_gdbserver or cls.debug_gdb:
798 if cls.vpp.returncode is None:
800 print(double_line_delim)
801 print("VPP or GDB server is still running")
802 print(single_line_delim)
804 "When done debugging, press ENTER to kill the "
805 "process and finish running the testcase..."
807 except AttributeError:
813 Disconnect vpp-api, kill vpp and cleanup shared memory files
816 if hasattr(cls, "running_vpp"):
819 # first signal that we want to stop the pump thread, then wake it up
820 if hasattr(cls, "pump_thread_stop_flag"):
821 cls.pump_thread_stop_flag.set()
822 if hasattr(cls, "pump_thread_wakeup_pipe"):
823 os.write(cls.pump_thread_wakeup_pipe[1], b"ding dong wake up")
824 if hasattr(cls, "pump_thread"):
825 cls.logger.debug("Waiting for pump thread to stop")
826 cls.pump_thread.join()
827 if hasattr(cls, "vpp_stderr_reader_thread"):
828 cls.logger.debug("Waiting for stderr pump to stop")
829 cls.vpp_stderr_reader_thread.join()
831 if hasattr(cls, "vpp"):
832 if hasattr(cls, "vapi"):
833 cls.logger.debug(cls.vapi.vpp.get_stats())
834 cls.logger.debug("Disconnecting class vapi client on %s", cls.__name__)
835 cls.vapi.disconnect()
836 cls.logger.debug("Deleting class vapi attribute on %s", cls.__name__)
839 if not cls.debug_attach and cls.vpp.returncode is None:
840 cls.wait_for_coredump()
841 cls.logger.debug("Sending TERM to vpp")
843 cls.logger.debug("Waiting for vpp to die")
845 outs, errs = cls.vpp.communicate(timeout=5)
846 except subprocess.TimeoutExpired:
848 outs, errs = cls.vpp.communicate()
849 cls.logger.debug("Deleting class vpp attribute on %s", cls.__name__)
850 if not cls.debug_attach and not hasattr(cls, "running_vpp"):
851 cls.vpp.stdout.close()
852 cls.vpp.stderr.close()
853 # If vpp is a dynamic attribute set by the func use_running,
854 # deletion will result in an AttributeError that we can
858 except AttributeError:
861 if cls.vpp_startup_failed:
862 stdout_log = cls.logger.info
863 stderr_log = cls.logger.critical
865 stdout_log = cls.logger.info
866 stderr_log = cls.logger.info
868 if hasattr(cls, "vpp_stdout_deque"):
869 stdout_log(single_line_delim)
870 stdout_log("VPP output to stdout while running %s:", cls.__name__)
871 stdout_log(single_line_delim)
872 vpp_output = "".join(cls.vpp_stdout_deque)
873 with open(cls.tempdir + "/vpp_stdout.txt", "w") as f:
875 stdout_log("\n%s", vpp_output)
876 stdout_log(single_line_delim)
878 if hasattr(cls, "vpp_stderr_deque"):
879 stderr_log(single_line_delim)
880 stderr_log("VPP output to stderr while running %s:", cls.__name__)
881 stderr_log(single_line_delim)
882 vpp_output = "".join(cls.vpp_stderr_deque)
883 with open(cls.tempdir + "/vpp_stderr.txt", "w") as f:
885 stderr_log("\n%s", vpp_output)
886 stderr_log(single_line_delim)
889 def tearDownClass(cls):
890 """Perform final cleanup after running all tests in this test-case"""
891 cls.logger.debug("--- tearDownClass() for %s called ---" % cls.__name__)
892 if not hasattr(cls, "vpp"):
894 cls.reporter.send_keep_alive(cls, "tearDownClass")
896 cls.file_handler.close()
897 cls.reset_packet_infos()
898 if config.debug_framework:
899 debug_internal.on_tear_down_class(cls)
901 def show_commands_at_teardown(self):
902 """Allow subclass specific teardown logging additions."""
903 self.logger.info("--- No test specific show commands provided. ---")
906 """Show various debug prints after each test"""
908 "--- tearDown() for %s.%s(%s) called ---"
909 % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
911 if not hasattr(self, "vpp"):
915 if not self.vpp_dead:
916 self.logger.debug(self.vapi.cli("show trace max 1000"))
917 self.logger.info(self.vapi.ppcli("show interface"))
918 self.logger.info(self.vapi.ppcli("show hardware"))
919 self.logger.info(self.statistics.set_errors_str())
920 self.logger.info(self.vapi.ppcli("show run"))
921 self.logger.info(self.vapi.ppcli("show log"))
922 self.logger.info(self.vapi.ppcli("show bihash"))
923 self.logger.info("Logging testcase specific show commands.")
924 self.show_commands_at_teardown()
925 if self.remove_configured_vpp_objects_on_tear_down:
926 self.registry.remove_vpp_config(self.logger)
927 # Save/Dump VPP api trace log
928 m = self._testMethodName
929 api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
930 tmp_api_trace = "/tmp/%s" % api_trace
931 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
932 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
933 self.logger.info("Moving %s to %s\n" % (tmp_api_trace, vpp_api_trace_log))
934 shutil.move(tmp_api_trace, vpp_api_trace_log)
935 except VppTransportSocketIOError:
937 "VppTransportSocketIOError: Vpp dead. Cannot log show commands."
941 self.registry.unregister_all(self.logger)
944 """Clear trace before running each test"""
945 super(VppTestCase, self).setUp()
946 if not hasattr(self, "vpp"):
948 self.reporter.send_keep_alive(self)
952 testcase=self.__class__.__name__,
953 method_name=self._testMethodName,
955 self.sleep(0.1, "during setUp")
956 self.vpp_stdout_deque.append(
957 "--- test setUp() for %s.%s(%s) starts here ---\n"
958 % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
960 self.vpp_stderr_deque.append(
961 "--- test setUp() for %s.%s(%s) starts here ---\n"
962 % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
964 self.vapi.cli("clear trace")
965 # store the test instance inside the test class - so that objects
966 # holding the class can access instance methods (like assertEqual)
967 type(self).test_instance = self
970 def pg_enable_capture(cls, interfaces=None):
972 Enable capture on packet-generator interfaces
974 :param interfaces: iterable interface indexes (if None,
975 use self.pg_interfaces)
978 if interfaces is None:
979 interfaces = cls.pg_interfaces
984 def register_pcap(cls, intf, worker):
985 """Register a pcap in the testclass"""
986 # add to the list of captures with current timestamp
987 cls._pcaps.append((intf, worker))
990 def get_vpp_time(cls):
991 # processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT"
992 # returns float("2.190522")
993 timestr = cls.vapi.cli("show clock")
994 head, sep, tail = timestr.partition(",")
995 head, sep, tail = head.partition("Time now")
999 def sleep_on_vpp_time(cls, sec):
1000 """Sleep according to time in VPP world"""
1001 # On a busy system with many processes
1002 # we might end up with VPP time being slower than real world
1003 # So take that into account when waiting for VPP to do something
1004 start_time = cls.get_vpp_time()
1005 while cls.get_vpp_time() - start_time < sec:
1009 def pg_start(cls, trace=True):
1010 """Enable the PG, wait till it is done, then clean up"""
1011 for (intf, worker) in cls._old_pcaps:
1012 intf.handle_old_pcap_file(intf.get_in_path(worker), intf.in_history_counter)
1015 cls.vapi.cli("clear trace")
1016 cls.vapi.cli("trace add pg-input 1000")
1017 cls.vapi.cli("packet-generator enable")
1018 # PG, when starts, runs to completion -
1019 # so let's avoid a race condition,
1020 # and wait a little till it's done.
1021 # Then clean it up - and then be gone.
1022 deadline = time.time() + 300
1023 while cls.vapi.cli("show packet-generator").find("Yes") != -1:
1024 cls.sleep(0.01) # yield
1025 if time.time() > deadline:
1026 cls.logger.error("Timeout waiting for pg to stop")
1028 for intf, worker in cls._pcaps:
1029 cls.vapi.cli("packet-generator delete %s" % intf.get_cap_name(worker))
1030 cls._old_pcaps = cls._pcaps
1034 def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0, mode=None):
1036 Create packet-generator interfaces.
1038 :param interfaces: iterable indexes of the interfaces.
1039 :returns: List of created interfaces.
1043 for i in interfaces:
1044 intf = VppPGInterface(cls, i, gso, gso_size, mode)
1045 setattr(cls, intf.name, intf)
1047 cls.pg_interfaces = result
1051 def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
1052 if not hasattr(cls, "vpp"):
1053 cls.pg_interfaces = []
1054 return cls.pg_interfaces
1055 pgmode = VppEnum.vl_api_pg_interface_mode_t
1056 return cls.create_pg_interfaces_internal(
1057 interfaces, gso, gso_size, pgmode.PG_API_MODE_IP4
1061 def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
1062 if not hasattr(cls, "vpp"):
1063 cls.pg_interfaces = []
1064 return cls.pg_interfaces
1065 pgmode = VppEnum.vl_api_pg_interface_mode_t
1066 return cls.create_pg_interfaces_internal(
1067 interfaces, gso, gso_size, pgmode.PG_API_MODE_IP6
1071 def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
1072 if not hasattr(cls, "vpp"):
1073 cls.pg_interfaces = []
1074 return cls.pg_interfaces
1075 pgmode = VppEnum.vl_api_pg_interface_mode_t
1076 return cls.create_pg_interfaces_internal(
1077 interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
1081 def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
1082 if not hasattr(cls, "vpp"):
1083 cls.pg_interfaces = []
1084 return cls.pg_interfaces
1085 pgmode = VppEnum.vl_api_pg_interface_mode_t
1086 return cls.create_pg_interfaces_internal(
1087 interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
1091 def create_loopback_interfaces(cls, count):
1093 Create loopback interfaces.
1095 :param count: number of interfaces created.
1096 :returns: List of created interfaces.
1098 if not hasattr(cls, "vpp"):
1099 cls.lo_interfaces = []
1100 return cls.lo_interfaces
1101 result = [VppLoInterface(cls) for i in range(count)]
1103 setattr(cls, intf.name, intf)
1104 cls.lo_interfaces = result
1108 def create_bvi_interfaces(cls, count):
1110 Create BVI interfaces.
1112 :param count: number of interfaces created.
1113 :returns: List of created interfaces.
1115 if not hasattr(cls, "vpp"):
1116 cls.bvi_interfaces = []
1117 return cls.bvi_interfaces
1118 result = [VppBviInterface(cls) for i in range(count)]
1120 setattr(cls, intf.name, intf)
1121 cls.bvi_interfaces = result
1125 def extend_packet(packet, size, padding=" "):
1127 Extend packet to given size by padding with spaces or custom padding
1128 NOTE: Currently works only when Raw layer is present.
1130 :param packet: packet
1131 :param size: target size
1132 :param padding: padding used to extend the payload
1135 packet_len = len(packet) + 4
1136 extend = size - packet_len
1138 num = (extend // len(padding)) + 1
1139 packet[Raw].load += (padding * num)[:extend].encode("ascii")
1142 def reset_packet_infos(cls):
1143 """Reset the list of packet info objects and packet counts to zero"""
1144 cls._packet_infos = {}
1145 cls._packet_count_for_dst_if_idx = {}
1148 def create_packet_info(cls, src_if, dst_if):
1150 Create packet info object containing the source and destination indexes
1151 and add it to the testcase's packet info list
1153 :param VppInterface src_if: source interface
1154 :param VppInterface dst_if: destination interface
1156 :returns: _PacketInfo object
1159 info = _PacketInfo()
1160 info.index = len(cls._packet_infos)
1161 info.src = src_if.sw_if_index
1162 info.dst = dst_if.sw_if_index
1163 if isinstance(dst_if, VppSubInterface):
1164 dst_idx = dst_if.parent.sw_if_index
1166 dst_idx = dst_if.sw_if_index
1167 if dst_idx in cls._packet_count_for_dst_if_idx:
1168 cls._packet_count_for_dst_if_idx[dst_idx] += 1
1170 cls._packet_count_for_dst_if_idx[dst_idx] = 1
1171 cls._packet_infos[info.index] = info
1175 def info_to_payload(info):
1177 Convert _PacketInfo object to packet payload
1179 :param info: _PacketInfo object
1181 :returns: string containing serialized data from packet info
1184 # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1185 return pack("iiiih", info.index, info.src, info.dst, info.ip, info.proto)
1188 def payload_to_info(payload, payload_field="load"):
1190 Convert packet payload to _PacketInfo object
1192 :param payload: packet payload
1193 :type payload: <class 'scapy.packet.Raw'>
1194 :param payload_field: packet fieldname of payload "load" for
1195 <class 'scapy.packet.Raw'>
1196 :type payload_field: str
1197 :returns: _PacketInfo object containing de-serialized data from payload
1201 # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1202 payload_b = getattr(payload, payload_field)[:18]
1204 info = _PacketInfo()
1205 info.index, info.src, info.dst, info.ip, info.proto = unpack("iiiih", payload_b)
1207 # some SRv6 TCs depend on get an exception if bad values are detected
1208 if info.index > 0x4000:
1209 raise ValueError("Index value is invalid")
1213 def get_next_packet_info(self, info):
1215 Iterate over the packet info list stored in the testcase
1216 Start iteration with first element if info is None
1217 Continue based on index in info if info is specified
1219 :param info: info or None
1220 :returns: next info in list or None if no more infos
1225 next_index = info.index + 1
1226 if next_index == len(self._packet_infos):
1229 return self._packet_infos[next_index]
1231 def get_next_packet_info_for_interface(self, src_index, info):
1233 Search the packet info list for the next packet info with same source
1236 :param src_index: source interface index to search for
1237 :param info: packet info - where to start the search
1238 :returns: packet info or None
1242 info = self.get_next_packet_info(info)
1245 if info.src == src_index:
1248 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
1250 Search the packet info list for the next packet info with same source
1251 and destination interface indexes
1253 :param src_index: source interface index to search for
1254 :param dst_index: destination interface index to search for
1255 :param info: packet info - where to start the search
1256 :returns: packet info or None
1260 info = self.get_next_packet_info_for_interface(src_index, info)
1263 if info.dst == dst_index:
1266 def assert_equal(self, real_value, expected_value, name_or_class=None):
1267 if name_or_class is None:
1268 self.assertEqual(real_value, expected_value)
1271 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
1273 getdoc(name_or_class).strip(),
1275 str(name_or_class(real_value)),
1277 str(name_or_class(expected_value)),
1280 msg = "Invalid %s: %s does not match expected value %s" % (
1286 self.assertEqual(real_value, expected_value, msg)
1288 def assert_in_range(self, real_value, expected_min, expected_max, name=None):
1292 msg = "Invalid %s: %s out of range <%s,%s>" % (
1298 self.assertTrue(expected_min <= real_value <= expected_max, msg)
1300 def assert_packet_checksums_valid(self, packet, ignore_zero_udp_checksums=True):
1301 received = packet.__class__(scapy.compat.raw(packet))
1302 udp_layers = ["UDP", "UDPerror"]
1303 checksum_fields = ["cksum", "chksum"]
1306 temp = received.__class__(scapy.compat.raw(received))
1308 layer = temp.getlayer(counter)
1310 layer = layer.copy()
1311 layer.remove_payload()
1312 for cf in checksum_fields:
1313 if hasattr(layer, cf):
1315 ignore_zero_udp_checksums
1316 and 0 == getattr(layer, cf)
1317 and layer.name in udp_layers
1320 delattr(temp.getlayer(counter), cf)
1321 checksums.append((counter, cf))
1324 counter = counter + 1
1325 if 0 == len(checksums):
1327 temp = temp.__class__(scapy.compat.raw(temp))
1328 for layer, cf in reversed(checksums):
1329 calc_sum = getattr(temp[layer], cf)
1331 getattr(received[layer], cf),
1333 "packet checksum on layer #%d: %s" % (layer, temp[layer].name),
1336 "Checksum field `%s` on `%s` layer has correct value `%s`"
1337 % (cf, temp[layer].name, calc_sum)
1340 def assert_checksum_valid(
1344 checksum_field_names=["chksum", "cksum"],
1345 ignore_zero_checksum=False,
1347 """Check checksum of received packet on given layer"""
1348 layer_copy = received_packet[layer].copy()
1349 layer_copy.remove_payload()
1351 for f in checksum_field_names:
1352 if hasattr(layer_copy, f):
1355 if field_name is None:
1357 f"Layer `{layer}` has none of checksum fields: `{checksum_field_names}`."
1359 received_packet_checksum = getattr(received_packet[layer], field_name)
1360 if ignore_zero_checksum and 0 == received_packet_checksum:
1362 recalculated = received_packet.__class__(scapy.compat.raw(received_packet))
1363 delattr(recalculated[layer], field_name)
1364 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1366 received_packet_checksum,
1367 getattr(recalculated[layer], field_name),
1368 f"packet checksum (field: {field_name}) on layer: %s" % layer,
1371 def assert_ip_checksum_valid(self, received_packet, ignore_zero_checksum=False):
1372 self.assert_checksum_valid(
1373 received_packet, "IP", ignore_zero_checksum=ignore_zero_checksum
1376 def assert_tcp_checksum_valid(self, received_packet, ignore_zero_checksum=False):
1377 self.assert_checksum_valid(
1378 received_packet, "TCP", ignore_zero_checksum=ignore_zero_checksum
1381 def assert_udp_checksum_valid(self, received_packet, ignore_zero_checksum=True):
1382 self.assert_checksum_valid(
1383 received_packet, "UDP", ignore_zero_checksum=ignore_zero_checksum
1386 def assert_embedded_icmp_checksum_valid(self, received_packet):
1387 if received_packet.haslayer(IPerror):
1388 self.assert_checksum_valid(received_packet, "IPerror")
1389 if received_packet.haslayer(TCPerror):
1390 self.assert_checksum_valid(received_packet, "TCPerror")
1391 if received_packet.haslayer(UDPerror):
1392 self.assert_checksum_valid(
1393 received_packet, "UDPerror", ignore_zero_checksum=True
1395 if received_packet.haslayer(ICMPerror):
1396 self.assert_checksum_valid(received_packet, "ICMPerror")
1398 def assert_icmp_checksum_valid(self, received_packet):
1399 self.assert_checksum_valid(received_packet, "ICMP")
1400 self.assert_embedded_icmp_checksum_valid(received_packet)
1402 def assert_icmpv6_checksum_valid(self, pkt):
1403 if pkt.haslayer(ICMPv6DestUnreach):
1404 self.assert_checksum_valid(pkt, "ICMPv6DestUnreach")
1405 self.assert_embedded_icmp_checksum_valid(pkt)
1406 if pkt.haslayer(ICMPv6EchoRequest):
1407 self.assert_checksum_valid(pkt, "ICMPv6EchoRequest")
1408 if pkt.haslayer(ICMPv6EchoReply):
1409 self.assert_checksum_valid(pkt, "ICMPv6EchoReply")
1411 def get_counter(self, counter):
1412 if counter.startswith("/"):
1413 counter_value = self.statistics.get_counter(counter)
1415 counters = self.vapi.cli("sh errors").split("\n")
1417 for i in range(1, len(counters) - 1):
1418 results = counters[i].split()
1419 if results[1] == counter:
1420 counter_value = int(results[0])
1422 return counter_value
1424 def assert_counter_equal(self, counter, expected_value, thread=None, index=0):
1425 c = self.get_counter(counter)
1426 if thread is not None:
1427 c = c[thread][index]
1429 c = sum(x[index] for x in c)
1430 self.assert_equal(c, expected_value, "counter `%s'" % counter)
1432 def assert_packet_counter_equal(self, counter, expected_value):
1433 counter_value = self.get_counter(counter)
1435 counter_value, expected_value, "packet counter `%s'" % counter
1438 def assert_error_counter_equal(self, counter, expected_value):
1439 counter_value = self.statistics[counter].sum()
1440 self.assert_equal(counter_value, expected_value, "error counter `%s'" % counter)
1443 def sleep(cls, timeout, remark=None):
1445 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1446 # * by Guido, only the main thread can be interrupted.
1448 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1451 if hasattr(os, "sched_yield"):
1457 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1458 before = time.time()
1461 if after - before > 2 * timeout:
1463 "unexpected self.sleep() result - slept for %es instead of ~%es!",
1469 "Finished sleep (%s) - slept %es (wanted %es)",
1475 def virtual_sleep(self, timeout, remark=None):
1476 self.logger.debug("Moving VPP time by %s (%s)", timeout, remark)
1477 self.vapi.cli("set clock adjust %s" % timeout)
1479 def pg_send(self, intf, pkts, worker=None, trace=True):
1480 intf.add_stream(pkts, worker=worker)
1481 self.pg_enable_capture(self.pg_interfaces)
1482 self.pg_start(trace=trace)
1484 def snapshot_stats(self, stats_diff):
1485 """Return snapshot of interesting stats based on diff dictionary."""
1487 for sw_if_index in stats_diff:
1488 for counter in stats_diff[sw_if_index]:
1489 stats_snapshot[counter] = self.statistics[counter]
1490 self.logger.debug(f"Took statistics stats_snapshot: {stats_snapshot}")
1491 return stats_snapshot
1493 def compare_stats_with_snapshot(self, stats_diff, stats_snapshot):
1494 """Assert appropriate difference between current stats and snapshot."""
1495 for sw_if_index in stats_diff:
1496 for cntr, diff in stats_diff[sw_if_index].items():
1497 if sw_if_index == "err":
1499 self.statistics[cntr].sum(),
1500 stats_snapshot[cntr].sum() + diff,
1501 f"'{cntr}' counter value (previous value: "
1502 f"{stats_snapshot[cntr].sum()}, "
1503 f"expected diff: {diff})",
1508 self.statistics[cntr][:, sw_if_index].sum(),
1509 stats_snapshot[cntr][:, sw_if_index].sum() + diff,
1510 f"'{cntr}' counter value (previous value: "
1511 f"{stats_snapshot[cntr][:, sw_if_index].sum()}, "
1512 f"expected diff: {diff})",
1514 except IndexError as e:
1515 # if diff is 0, then this most probably a case where
1516 # test declares multiple interfaces but traffic hasn't
1517 # passed through this one yet - which means the counter
1518 # value is 0 and can be ignored
1521 f"Couldn't sum counter: {cntr} on sw_if_index: {sw_if_index}"
1524 def send_and_assert_no_replies(
1525 self, intf, pkts, remark="", timeout=None, stats_diff=None, trace=True, msg=None
1528 stats_snapshot = self.snapshot_stats(stats_diff)
1530 self.pg_send(intf, pkts)
1535 for i in self.pg_interfaces:
1536 i.assert_nothing_captured(timeout=timeout, remark=remark)
1541 self.logger.debug(f"send_and_assert_no_replies: {msg}")
1542 self.logger.debug(self.vapi.cli("show trace"))
1545 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1547 def send_and_expect(
1559 stats_snapshot = self.snapshot_stats(stats_diff)
1562 n_rx = 1 if isinstance(pkts, Packet) else len(pkts)
1563 self.pg_send(intf, pkts, worker=worker, trace=trace)
1564 rx = output.get_capture(n_rx)
1567 self.logger.debug(f"send_and_expect: {msg}")
1568 self.logger.debug(self.vapi.cli("show trace"))
1571 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1575 def send_and_expect_load_balancing(
1576 self, input, pkts, outputs, worker=None, trace=True
1578 self.pg_send(input, pkts, worker=worker, trace=trace)
1581 rx = oo._get_capture(1)
1582 self.assertNotEqual(0, len(rx))
1585 self.logger.debug(self.vapi.cli("show trace"))
1588 def send_and_expect_some(self, intf, pkts, output, worker=None, trace=True):
1589 self.pg_send(intf, pkts, worker=worker, trace=trace)
1590 rx = output._get_capture(1)
1592 self.logger.debug(self.vapi.cli("show trace"))
1593 self.assertTrue(len(rx) > 0)
1594 self.assertTrue(len(rx) < len(pkts))
1597 def send_and_expect_only(self, intf, pkts, output, timeout=None, stats_diff=None):
1599 stats_snapshot = self.snapshot_stats(stats_diff)
1601 self.pg_send(intf, pkts)
1602 rx = output.get_capture(len(pkts))
1606 for i in self.pg_interfaces:
1607 if i not in outputs:
1608 i.assert_nothing_captured(timeout=timeout)
1612 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1617 def get_testcase_doc_name(test):
1618 return getdoc(test.__class__).splitlines()[0]
1621 def get_test_description(descriptions, test):
1622 short_description = test.shortDescription()
1623 if descriptions and short_description:
1624 return short_description
1629 class TestCaseInfo(object):
1630 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1631 self.logger = logger
1632 self.tempdir = tempdir
1633 self.vpp_pid = vpp_pid
1634 self.vpp_bin_path = vpp_bin_path
1635 self.core_crash_test = None
1638 class VppTestResult(unittest.TestResult):
1640 @property result_string
1641 String variable to store the test case result string.
1643 List variable containing 2-tuples of TestCase instances and strings
1644 holding formatted tracebacks. Each tuple represents a test which
1645 raised an unexpected exception.
1647 List variable containing 2-tuples of TestCase instances and strings
1648 holding formatted tracebacks. Each tuple represents a test where
1649 a failure was explicitly signalled using the TestCase.assert*()
1653 failed_test_cases_info = set()
1654 core_crash_test_cases_info = set()
1655 current_test_case_info = None
1657 def __init__(self, stream=None, descriptions=None, verbosity=None, runner=None):
1659 :param stream File descriptor to store where to report test results.
1660 Set to the standard error stream by default.
1661 :param descriptions Boolean variable to store information if to use
1662 test case descriptions.
1663 :param verbosity Integer variable to store required verbosity level.
1665 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1666 self.stream = stream
1667 self.descriptions = descriptions
1668 self.verbosity = verbosity
1669 self.result_string = None
1670 self.runner = runner
1673 def addSuccess(self, test):
1675 Record a test succeeded result
1680 if self.current_test_case_info:
1681 self.current_test_case_info.logger.debug(
1682 "--- addSuccess() %s.%s(%s) called"
1683 % (test.__class__.__name__, test._testMethodName, test._testMethodDoc)
1685 unittest.TestResult.addSuccess(self, test)
1686 self.result_string = colorize("OK", GREEN)
1688 self.send_result_through_pipe(test, PASS)
1690 def addSkip(self, test, reason):
1692 Record a test skipped.
1698 if self.current_test_case_info:
1699 self.current_test_case_info.logger.debug(
1700 "--- addSkip() %s.%s(%s) called, reason is %s"
1702 test.__class__.__name__,
1703 test._testMethodName,
1704 test._testMethodDoc,
1708 unittest.TestResult.addSkip(self, test, reason)
1709 self.result_string = colorize("SKIP", YELLOW)
1711 if reason == "not enough cpus":
1712 self.send_result_through_pipe(test, SKIP_CPU_SHORTAGE)
1714 self.send_result_through_pipe(test, SKIP)
1716 def symlink_failed(self):
1717 if self.current_test_case_info:
1719 failed_dir = config.failed_dir
1720 link_path = os.path.join(
1722 "%s-FAILED" % os.path.basename(self.current_test_case_info.tempdir),
1725 self.current_test_case_info.logger.debug(
1726 "creating a link to the failed test"
1728 self.current_test_case_info.logger.debug(
1729 "os.symlink(%s, %s)"
1730 % (self.current_test_case_info.tempdir, link_path)
1732 if os.path.exists(link_path):
1733 self.current_test_case_info.logger.debug("symlink already exists")
1735 os.symlink(self.current_test_case_info.tempdir, link_path)
1737 except Exception as e:
1738 self.current_test_case_info.logger.error(e)
1740 def send_result_through_pipe(self, test, result):
1741 if hasattr(self, "test_framework_result_pipe"):
1742 pipe = self.test_framework_result_pipe
1744 pipe.send((test.id(), result))
1746 def log_error(self, test, err, fn_name):
1747 if self.current_test_case_info:
1748 if isinstance(test, unittest.suite._ErrorHolder):
1749 test_name = test.description
1751 test_name = "%s.%s(%s)" % (
1752 test.__class__.__name__,
1753 test._testMethodName,
1754 test._testMethodDoc,
1756 self.current_test_case_info.logger.debug(
1757 "--- %s() %s called, err is %s" % (fn_name, test_name, err)
1759 self.current_test_case_info.logger.debug(
1760 "formatted exception is:\n%s" % "".join(format_exception(*err))
1763 def add_error(self, test, err, unittest_fn, error_type):
1764 if error_type == FAIL:
1765 self.log_error(test, err, "addFailure")
1766 error_type_str = colorize("FAIL", RED)
1767 elif error_type == ERROR:
1768 self.log_error(test, err, "addError")
1769 error_type_str = colorize("ERROR", RED)
1772 "Error type %s cannot be used to record an "
1773 "error or a failure" % error_type
1776 unittest_fn(self, test, err)
1777 if self.current_test_case_info:
1778 self.result_string = "%s [ temp dir used by test case: %s ]" % (
1780 self.current_test_case_info.tempdir,
1782 self.symlink_failed()
1783 self.failed_test_cases_info.add(self.current_test_case_info)
1784 if is_core_present(self.current_test_case_info.tempdir):
1785 if not self.current_test_case_info.core_crash_test:
1786 if isinstance(test, unittest.suite._ErrorHolder):
1787 test_name = str(test)
1789 test_name = "'{!s}' ({!s})".format(
1790 get_testcase_doc_name(test), test.id()
1792 self.current_test_case_info.core_crash_test = test_name
1793 self.core_crash_test_cases_info.add(self.current_test_case_info)
1795 self.result_string = "%s [no temp dir]" % error_type_str
1797 self.send_result_through_pipe(test, error_type)
1799 def addFailure(self, test, err):
1801 Record a test failed result
1804 :param err: error message
1807 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1809 def addError(self, test, err):
1811 Record a test error result
1814 :param err: error message
1817 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1819 def getDescription(self, test):
1821 Get test description
1824 :returns: test description
1827 return get_test_description(self.descriptions, test)
1829 def startTest(self, test):
1837 def print_header(test):
1838 if test.__class__ in self.printed:
1841 test_doc = getdoc(test)
1843 raise Exception("No doc string for test '%s'" % test.id())
1845 test_title = test_doc.splitlines()[0].rstrip()
1846 test_title = colorize(test_title, GREEN)
1847 if test.is_tagged_run_solo():
1848 test_title = colorize(f"SOLO RUN: {test_title}", YELLOW)
1850 # This block may overwrite the colorized title above,
1851 # but we want this to stand out and be fixed
1852 if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
1853 test_title = colorize(f"FIXME with VPP workers: {test_title}", RED)
1855 if test.has_tag(TestCaseTag.FIXME_ASAN):
1856 test_title = colorize(f"FIXME with ASAN: {test_title}", RED)
1857 test.skip_fixme_asan()
1859 if is_distro_ubuntu2204 == True and test.has_tag(
1860 TestCaseTag.FIXME_UBUNTU2204
1862 test_title = colorize(f"FIXME on Ubuntu-22.04: {test_title}", RED)
1863 test.skip_fixme_ubuntu2204()
1865 if is_distro_debian11 == True and test.has_tag(TestCaseTag.FIXME_DEBIAN11):
1866 test_title = colorize(f"FIXME on Debian-11: {test_title}", RED)
1867 test.skip_fixme_debian11()
1869 if "debug" in config.vpp_tag and test.has_tag(TestCaseTag.FIXME_VPP_DEBUG):
1870 test_title = colorize(f"FIXME on VPP Debug: {test_title}", RED)
1871 test.skip_fixme_vpp_debug()
1873 if hasattr(test, "vpp_worker_count"):
1874 if test.vpp_worker_count == 0:
1875 test_title += " [main thread only]"
1876 elif test.vpp_worker_count == 1:
1877 test_title += " [1 worker thread]"
1879 test_title += f" [{test.vpp_worker_count} worker threads]"
1881 if test.__class__.skipped_due_to_cpu_lack:
1882 test_title = colorize(
1883 f"{test_title} [skipped - not enough cpus, "
1884 f"required={test.__class__.get_cpus_required()}, "
1885 f"available={max_vpp_cpus}]",
1889 print(double_line_delim)
1891 print(double_line_delim)
1892 self.printed.append(test.__class__)
1895 self.start_test = time.time()
1896 unittest.TestResult.startTest(self, test)
1897 if self.verbosity > 0:
1898 self.stream.writeln("Starting " + self.getDescription(test) + " ...")
1899 self.stream.writeln(single_line_delim)
1901 def stopTest(self, test):
1903 Called when the given test has been run
1908 unittest.TestResult.stopTest(self, test)
1910 if self.verbosity > 0:
1911 self.stream.writeln(single_line_delim)
1912 self.stream.writeln(
1913 "%-73s%s" % (self.getDescription(test), self.result_string)
1915 self.stream.writeln(single_line_delim)
1917 self.stream.writeln(
1920 self.getDescription(test),
1921 time.time() - self.start_test,
1926 self.send_result_through_pipe(test, TEST_RUN)
1928 def printErrors(self):
1930 Print errors from running the test case
1932 if len(self.errors) > 0 or len(self.failures) > 0:
1933 self.stream.writeln()
1934 self.printErrorList("ERROR", self.errors)
1935 self.printErrorList("FAIL", self.failures)
1937 # ^^ that is the last output from unittest before summary
1938 if not self.runner.print_summary:
1939 devnull = unittest.runner._WritelnDecorator(open(os.devnull, "w"))
1940 self.stream = devnull
1941 self.runner.stream = devnull
1943 def printErrorList(self, flavour, errors):
1945 Print error list to the output stream together with error type
1946 and test case description.
1948 :param flavour: error type
1949 :param errors: iterable errors
1952 for test, err in errors:
1953 self.stream.writeln(double_line_delim)
1954 self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
1955 self.stream.writeln(single_line_delim)
1956 self.stream.writeln("%s" % err)
1959 class VppTestRunner(unittest.TextTestRunner):
1961 A basic test runner implementation which prints results to standard error.
1965 def resultclass(self):
1966 """Class maintaining the results of the tests"""
1967 return VppTestResult
1971 keep_alive_pipe=None,
1981 # ignore stream setting here, use hard-coded stdout to be in sync
1982 # with prints from VppTestCase methods ...
1983 super(VppTestRunner, self).__init__(
1984 sys.stdout, descriptions, verbosity, failfast, buffer, resultclass, **kwargs
1986 KeepAliveReporter.pipe = keep_alive_pipe
1988 self.orig_stream = self.stream
1989 self.resultclass.test_framework_result_pipe = result_pipe
1991 self.print_summary = print_summary
1993 def _makeResult(self):
1994 return self.resultclass(self.stream, self.descriptions, self.verbosity, self)
1996 def run(self, test):
2003 faulthandler.enable() # emit stack trace to stderr if killed by signal
2005 result = super(VppTestRunner, self).run(test)
2006 if not self.print_summary:
2007 self.stream = self.orig_stream
2008 result.stream = self.orig_stream
2012 class Worker(Thread):
2013 def __init__(self, executable_args, logger, env=None, *args, **kwargs):
2014 super(Worker, self).__init__(*args, **kwargs)
2015 self.logger = logger
2016 self.args = executable_args
2017 if hasattr(self, "testcase") and self.testcase.debug_all:
2018 if self.testcase.debug_gdbserver:
2020 "/usr/bin/gdbserver",
2021 "localhost:{port}".format(port=self.testcase.gdbserver_port),
2023 elif self.testcase.debug_gdb and hasattr(self, "wait_for_gdb"):
2024 self.args.append(self.wait_for_gdb)
2025 self.app_bin = executable_args[0]
2026 self.app_name = os.path.basename(self.app_bin)
2027 if hasattr(self, "role"):
2028 self.app_name += " {role}".format(role=self.role)
2031 env = {} if env is None else env
2032 self.env = copy.deepcopy(env)
2034 def wait_for_enter(self):
2035 if not hasattr(self, "testcase"):
2037 if self.testcase.debug_all and self.testcase.debug_gdbserver:
2039 print(double_line_delim)
2041 "Spawned GDB Server for '{app}' with PID: {pid}".format(
2042 app=self.app_name, pid=self.process.pid
2045 elif self.testcase.debug_all and self.testcase.debug_gdb:
2047 print(double_line_delim)
2049 "Spawned '{app}' with PID: {pid}".format(
2050 app=self.app_name, pid=self.process.pid
2055 print(single_line_delim)
2056 print("You can debug '{app}' using:".format(app=self.app_name))
2057 if self.testcase.debug_gdbserver:
2061 + " -ex 'target remote localhost:{port}'".format(
2062 port=self.testcase.gdbserver_port
2066 "Now is the time to attach gdb by running the above "
2067 "command, set up breakpoints etc., then resume from "
2068 "within gdb by issuing the 'continue' command"
2070 self.testcase.gdbserver_port += 1
2071 elif self.testcase.debug_gdb:
2075 + " -ex 'attach {pid}'".format(pid=self.process.pid)
2078 "Now is the time to attach gdb by running the above "
2079 "command and set up breakpoints etc., then resume from"
2080 " within gdb by issuing the 'continue' command"
2082 print(single_line_delim)
2083 input("Press ENTER to continue running the testcase...")
2086 executable = self.args[0]
2087 if not os.path.exists(executable) or not os.access(
2088 executable, os.F_OK | os.X_OK
2090 # Exit code that means some system file did not exist,
2091 # could not be opened, or had some other kind of error.
2092 self.result = os.EX_OSFILE
2093 raise EnvironmentError(
2094 "executable '%s' is not found or executable." % executable
2097 "Running executable '{app}': '{cmd}'".format(
2098 app=self.app_name, cmd=" ".join(self.args)
2101 env = os.environ.copy()
2102 env.update(self.env)
2103 env["CK_LOG_FILE_NAME"] = "-"
2104 self.process = subprocess.Popen(
2105 ["stdbuf", "-o0", "-e0"] + self.args,
2108 preexec_fn=os.setpgrp,
2109 stdout=subprocess.PIPE,
2110 stderr=subprocess.PIPE,
2112 self.wait_for_enter()
2113 out, err = self.process.communicate()
2114 self.logger.debug("Finished running `{app}'".format(app=self.app_name))
2115 self.logger.info("Return code is `%s'" % self.process.returncode)
2116 self.logger.info(single_line_delim)
2118 "Executable `{app}' wrote to stdout:".format(app=self.app_name)
2120 self.logger.info(single_line_delim)
2121 self.logger.info(out.decode("utf-8"))
2122 self.logger.info(single_line_delim)
2124 "Executable `{app}' wrote to stderr:".format(app=self.app_name)
2126 self.logger.info(single_line_delim)
2127 self.logger.info(err.decode("utf-8"))
2128 self.logger.info(single_line_delim)
2129 self.result = self.process.returncode
2132 if __name__ == "__main__":