3 from __future__ import print_function
18 from collections import deque
19 from threading import Thread, Event
20 from inspect import getdoc, isclass
21 from traceback import format_exception
22 from logging import FileHandler, DEBUG, Formatter
24 from abc import ABC, abstractmethod
25 from struct import pack, unpack
28 from scapy.packet import Raw, Packet
29 from config import config, available_cpus, num_cpus, max_vpp_cpus
30 import hook as hookmodule
31 from vpp_pg_interface import VppPGInterface
32 from vpp_sub_interface import VppSubInterface
33 from vpp_lo_interface import VppLoInterface
34 from vpp_bvi_interface import VppBviInterface
35 from vpp_papi_provider import VppPapiProvider
36 from vpp_papi import VppEnum
38 from vpp_papi.vpp_stats import VPPStats
39 from vpp_papi.vpp_transport_socket import VppTransportSocketIOError
49 from vpp_object import VppObjectRegistry
50 from util import ppp, is_core_present
51 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
52 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
53 from scapy.layers.inet6 import ICMPv6EchoReply
54 from vpp_running import use_running
55 from test_result_code import TestResultCode
58 logger = logging.getLogger(__name__)
60 # Set up an empty logger for the testcase that can be overridden as necessary
61 null_logger = logging.getLogger("VppTestCase")
62 null_logger.addHandler(logging.NullHandler())
65 if config.debug_framework:
69 Test framework module.
71 The module provides a set of tools for constructing and running tests and
72 representing the results.
76 class VppDiedError(Exception):
77 """exception for reporting that the subprocess has died."""
81 for k, v in signal.__dict__.items()
82 if k.startswith("SIG") and not k.startswith("SIG_")
85 def __init__(self, rv=None, testcase=None, method_name=None):
87 self.signal_name = None
88 self.testcase = testcase
89 self.method_name = method_name
92 self.signal_name = VppDiedError.signals_by_value[-rv]
93 except (KeyError, TypeError):
96 if testcase is None and method_name is None:
99 in_msg = " while running %s.%s" % (testcase, method_name)
102 msg = "VPP subprocess died unexpectedly%s with return code: %d%s." % (
105 " [%s]" % (self.signal_name if self.signal_name is not None else ""),
108 msg = "VPP subprocess died unexpectedly%s." % in_msg
110 super(VppDiedError, self).__init__(msg)
113 class _PacketInfo(object):
114 """Private class to create packet info object.
116 Help process information about the next packet.
117 Set variables to default values.
120 #: Store the index of the packet.
122 #: Store the index of the source packet generator interface of the packet.
124 #: Store the index of the destination packet generator interface
127 #: Store expected ip version
129 #: Store expected upper protocol
131 #: Store the copy of the former packet.
134 def __eq__(self, other):
135 index = self.index == other.index
136 src = self.src == other.src
137 dst = self.dst == other.dst
138 data = self.data == other.data
139 return index and src and dst and data
142 def pump_output(testclass):
143 """pump output from vpp stdout/stderr to proper queues"""
144 if not hasattr(testclass, "vpp"):
148 while not testclass.pump_thread_stop_flag.is_set():
149 readable = select.select(
151 testclass.vpp.stdout.fileno(),
152 testclass.vpp.stderr.fileno(),
153 testclass.pump_thread_wakeup_pipe[0],
158 if testclass.vpp.stdout.fileno() in readable:
159 read = os.read(testclass.vpp.stdout.fileno(), 102400)
161 split = read.decode("ascii", errors="backslashreplace").splitlines(True)
162 if len(stdout_fragment) > 0:
163 split[0] = "%s%s" % (stdout_fragment, split[0])
164 if len(split) > 0 and split[-1].endswith("\n"):
168 stdout_fragment = split[-1]
169 testclass.vpp_stdout_deque.extend(split[:limit])
170 if not config.cache_vpp_output:
171 for line in split[:limit]:
172 testclass.logger.info("VPP STDOUT: %s" % line.rstrip("\n"))
173 if testclass.vpp.stderr.fileno() in readable:
174 read = os.read(testclass.vpp.stderr.fileno(), 102400)
176 split = read.decode("ascii", errors="backslashreplace").splitlines(True)
177 if len(stderr_fragment) > 0:
178 split[0] = "%s%s" % (stderr_fragment, split[0])
179 if len(split) > 0 and split[-1].endswith("\n"):
183 stderr_fragment = split[-1]
185 testclass.vpp_stderr_deque.extend(split[:limit])
186 if not config.cache_vpp_output:
187 for line in split[:limit]:
188 testclass.logger.error("VPP STDERR: %s" % line.rstrip("\n"))
189 # ignoring the dummy pipe here intentionally - the
190 # flag will take care of properly terminating the loop
193 def _is_platform_aarch64():
194 return platform.machine() == "aarch64"
197 is_platform_aarch64 = _is_platform_aarch64()
200 def _is_distro_ubuntu2204():
201 with open("/etc/os-release") as f:
202 for line in f.readlines():
208 is_distro_ubuntu2204 = _is_distro_ubuntu2204()
211 def _is_distro_debian11():
212 with open("/etc/os-release") as f:
213 for line in f.readlines():
214 if "bullseye" in line:
219 is_distro_debian11 = _is_distro_debian11()
222 class KeepAliveReporter(object):
224 Singleton object which reports test start to parent process
230 self.__dict__ = self._shared_state
238 def pipe(self, pipe):
239 if self._pipe is not None:
240 raise Exception("Internal error - pipe should only be set once.")
243 def send_keep_alive(self, test, desc=None):
245 Write current test tmpdir & desc to keep-alive pipe to signal liveness
247 if not hasattr(test, "vpp") or self.pipe is None:
248 # if not running forked..
252 desc = "%s (%s)" % (desc, unittest.util.strclass(test))
256 self.pipe.send((desc, config.vpp, test.tempdir, test.vpp.pid))
259 class TestCaseTag(Enum):
260 # marks the suites that must run at the end
261 # using only a single test runner
263 # marks the suites broken on VPP multi-worker
264 FIXME_VPP_WORKERS = 2
265 # marks the suites broken when ASan is enabled
267 # marks suites broken on Ubuntu-22.04
269 # marks suites broken on Debian-11
271 # marks suites broken on debug vpp image
275 def create_tag_decorator(e):
278 cls.test_tags.append(e)
279 except AttributeError:
286 tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
287 tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
288 tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN)
289 tag_fixme_ubuntu2204 = create_tag_decorator(TestCaseTag.FIXME_UBUNTU2204)
290 tag_fixme_debian11 = create_tag_decorator(TestCaseTag.FIXME_DEBIAN11)
291 tag_fixme_vpp_debug = create_tag_decorator(TestCaseTag.FIXME_VPP_DEBUG)
305 class CPUInterface(ABC):
307 skipped_due_to_cpu_lack = False
311 def get_cpus_required(cls):
315 def assign_cpus(cls, cpus):
320 class VppTestCase(CPUInterface, unittest.TestCase):
321 """This subclass is a base class for VPP test cases that are implemented as
322 classes. It provides methods to create and run test case.
325 extra_vpp_statseg_config = ""
326 extra_vpp_config = []
327 extra_vpp_plugin_config = []
329 vapi_response_timeout = 5
330 remove_configured_vpp_objects_on_tear_down = True
333 def packet_infos(self):
334 """List of packet infos"""
335 return self._packet_infos
338 def get_packet_count_for_if_idx(cls, dst_if_index):
339 """Get the number of packet info for specified destination if index"""
340 if dst_if_index in cls._packet_count_for_dst_if_idx:
341 return cls._packet_count_for_dst_if_idx[dst_if_index]
346 def has_tag(cls, tag):
347 """if the test case has a given tag - return true"""
349 return tag in cls.test_tags
350 except AttributeError:
355 def is_tagged_run_solo(cls):
356 """if the test case class is timing-sensitive - return true"""
357 return cls.has_tag(TestCaseTag.RUN_SOLO)
360 def skip_fixme_asan(cls):
361 """if @tag_fixme_asan & ASan is enabled - mark for skip"""
362 if cls.has_tag(TestCaseTag.FIXME_ASAN):
363 vpp_extra_cmake_args = os.environ.get("VPP_EXTRA_CMAKE_ARGS", "")
364 if "DVPP_ENABLE_SANITIZE_ADDR=ON" in vpp_extra_cmake_args:
365 cls = unittest.skip("Skipping @tag_fixme_asan tests")(cls)
368 def skip_fixme_ubuntu2204(cls):
369 """if distro is ubuntu 22.04 and @tag_fixme_ubuntu2204 mark for skip"""
370 if cls.has_tag(TestCaseTag.FIXME_UBUNTU2204):
371 cls = unittest.skip("Skipping @tag_fixme_ubuntu2204 tests")(cls)
374 def skip_fixme_debian11(cls):
375 """if distro is Debian-11 and @tag_fixme_debian11 mark for skip"""
376 if cls.has_tag(TestCaseTag.FIXME_DEBIAN11):
377 cls = unittest.skip("Skipping @tag_fixme_debian11 tests")(cls)
380 def skip_fixme_vpp_debug(cls):
381 cls = unittest.skip("Skipping @tag_fixme_vpp_debug tests")(cls)
385 """Return the instance of this testcase"""
386 return cls.test_instance
389 def set_debug_flags(cls, d):
390 cls.gdbserver_port = 7777
391 cls.debug_core = False
392 cls.debug_gdb = False
393 cls.debug_gdbserver = False
394 cls.debug_all = False
395 cls.debug_attach = False
400 cls.debug_core = True
401 elif dl == "gdb" or dl == "gdb-all":
403 elif dl == "gdbserver" or dl == "gdbserver-all":
404 cls.debug_gdbserver = True
406 cls.debug_attach = True
408 raise Exception("Unrecognized DEBUG option: '%s'" % d)
409 if dl == "gdb-all" or dl == "gdbserver-all":
413 def get_vpp_worker_count(cls):
414 if not hasattr(cls, "vpp_worker_count"):
415 if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
416 cls.vpp_worker_count = 0
418 cls.vpp_worker_count = config.vpp_worker_count
419 return cls.vpp_worker_count
422 def get_cpus_required(cls):
423 return 1 + cls.get_vpp_worker_count()
426 def setUpConstants(cls):
427 """Set-up the test case class based on environment variables"""
428 cls.step = config.step
429 cls.plugin_path = ":".join(config.vpp_plugin_dir)
430 cls.test_plugin_path = ":".join(config.vpp_test_plugin_dir)
431 cls.extern_plugin_path = ":".join(config.extern_plugin_dir)
433 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
434 debug_cli = "cli-listen localhost:5002"
435 size = re.search(r"\d+[gG]", config.coredump_size)
437 coredump_size = f"coredump-size {config.coredump_size}".lower()
439 coredump_size = "coredump-size unlimited"
440 default_variant = config.variant
441 if default_variant is not None:
442 default_variant = "default { variant %s 100 }" % default_variant
446 api_fuzzing = config.api_fuzz
447 if api_fuzzing is None:
468 cls.get_api_segment_prefix(),
475 if cls.extern_plugin_path not in (None, ""):
476 cls.extra_vpp_plugin_config.append("add-path %s" % cls.extern_plugin_path)
477 if cls.get_vpp_worker_count():
478 cls.vpp_cmdline.extend(
479 ["corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])]
481 cls.vpp_cmdline.extend(
492 cls.get_stats_sock_path(),
493 cls.extra_vpp_statseg_config,
498 cls.get_api_sock_path(),
519 "lisp_unittest_plugin.so",
524 "unittest_plugin.so",
529 + cls.extra_vpp_plugin_config
535 if cls.extra_vpp_config is not None:
536 cls.vpp_cmdline.extend(cls.extra_vpp_config)
538 if not cls.debug_attach:
539 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
540 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
543 def wait_for_enter(cls):
544 if cls.debug_gdbserver:
545 print(double_line_delim)
546 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
548 print(double_line_delim)
549 print("Spawned VPP with PID: %d" % cls.vpp.pid)
551 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
553 print(single_line_delim)
554 print("You can debug VPP using:")
555 if cls.debug_gdbserver:
557 f"sudo gdb {config.vpp} "
558 f"-ex 'target remote localhost:{cls.gdbserver_port}'"
561 "Now is the time to attach gdb by running the above "
562 "command, set up breakpoints etc., then resume VPP from "
563 "within gdb by issuing the 'continue' command"
565 cls.gdbserver_port += 1
567 print(f"sudo gdb {config.vpp} -ex 'attach {cls.vpp.pid}'")
569 "Now is the time to attach gdb by running the above "
570 "command and set up breakpoints etc., then resume VPP from"
571 " within gdb by issuing the 'continue' command"
573 print(single_line_delim)
574 input("Press ENTER to continue running the testcase...")
583 is_distro_ubuntu2204 == True and cls.has_tag(TestCaseTag.FIXME_UBUNTU2204)
584 ) or (is_distro_debian11 == True and cls.has_tag(TestCaseTag.FIXME_DEBIAN11)):
586 cls.logger.debug(f"Assigned cpus: {cls.cpus}")
587 cmdline = cls.vpp_cmdline
589 if cls.debug_gdbserver:
590 gdbserver = "/usr/bin/gdbserver"
591 if not os.path.isfile(gdbserver) or not os.access(gdbserver, os.X_OK):
593 "gdbserver binary '%s' does not exist or is "
594 "not executable" % gdbserver
599 "localhost:{port}".format(port=cls.gdbserver_port),
601 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
604 cls.vpp = subprocess.Popen(
605 cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE
607 except subprocess.CalledProcessError as e:
609 "Subprocess returned with non-0 return code: (%s)", e.returncode
614 "Subprocess returned with OS error: (%s) %s", e.errno, e.strerror
617 except Exception as e:
618 cls.logger.exception("Subprocess returned unexpected from %s:", cmdline)
624 def wait_for_coredump(cls):
625 corefile = cls.tempdir + "/core"
626 if os.path.isfile(corefile):
627 cls.logger.error("Waiting for coredump to complete: %s", corefile)
628 curr_size = os.path.getsize(corefile)
629 deadline = time.time() + 60
631 while time.time() < deadline:
634 curr_size = os.path.getsize(corefile)
635 if size == curr_size:
640 "Timed out waiting for coredump to complete: %s", corefile
643 cls.logger.error("Coredump complete: %s, size %d", corefile, curr_size)
646 def get_stats_sock_path(cls):
647 return "%s/stats.sock" % cls.tempdir
650 def get_api_sock_path(cls):
651 return "%s/api.sock" % cls.tempdir
654 def get_api_segment_prefix(cls):
655 return os.path.basename(cls.tempdir) # Only used for VAPI
658 def get_tempdir(cls):
660 tmpdir = f"{config.tmp_dir}/unittest-attach-gdb"
662 tmpdir = f"{config.tmp_dir}/vpp-unittest-{cls.__name__}"
663 if config.wipe_tmp_dir:
664 shutil.rmtree(tmpdir, ignore_errors=True)
669 def create_file_handler(cls):
670 if config.log_dir is None:
671 cls.file_handler = FileHandler(f"{cls.tempdir}/log.txt")
674 logdir = f"{config.log_dir}/vpp-unittest-{cls.__name__}"
675 if config.wipe_tmp_dir:
676 shutil.rmtree(logdir, ignore_errors=True)
678 cls.file_handler = FileHandler(f"{logdir}/log.txt")
683 Perform class setup before running the testcase
684 Remove shared memory files, start vpp and connect the vpp-api
686 super(VppTestCase, cls).setUpClass()
687 cls.logger = get_logger(cls.__name__)
688 random.seed(config.rnd_seed)
689 if hasattr(cls, "parallel_handler"):
690 cls.logger.addHandler(cls.parallel_handler)
691 cls.logger.propagate = False
692 cls.set_debug_flags(config.debug)
693 cls.tempdir = cls.get_tempdir()
694 cls.create_file_handler()
695 cls.file_handler.setFormatter(
696 Formatter(fmt="%(asctime)s,%(msecs)03d %(message)s", datefmt="%H:%M:%S")
698 cls.file_handler.setLevel(DEBUG)
699 cls.logger.addHandler(cls.file_handler)
700 cls.logger.debug("--- setUpClass() for %s called ---" % cls.__name__)
701 os.chdir(cls.tempdir)
703 "Temporary dir is %s, api socket is %s",
705 cls.get_api_sock_path(),
707 cls.logger.debug("Random seed is %s", config.rnd_seed)
709 cls.reset_packet_infos()
714 cls.registry = VppObjectRegistry()
715 cls.vpp_startup_failed = False
716 cls.reporter = KeepAliveReporter()
717 # need to catch exceptions here because if we raise, then the cleanup
718 # doesn't get called and we might end with a zombie vpp
724 if not hasattr(cls, "vpp"):
726 cls.reporter.send_keep_alive(cls, "setUpClass")
727 VppTestResult.current_test_case_info = TestCaseInfo(
728 cls.logger, cls.tempdir, cls.vpp.pid, config.vpp
730 cls.vpp_stdout_deque = deque()
731 cls.vpp_stderr_deque = deque()
732 # Pump thread in a non-debug-attached & not running-vpp
733 if not cls.debug_attach and not hasattr(cls, "running_vpp"):
734 cls.pump_thread_stop_flag = Event()
735 cls.pump_thread_wakeup_pipe = os.pipe()
736 cls.pump_thread = Thread(target=pump_output, args=(cls,))
737 cls.pump_thread.daemon = True
738 cls.pump_thread.start()
739 if cls.debug_gdb or cls.debug_gdbserver or cls.debug_attach:
740 cls.vapi_response_timeout = 0
741 cls.vapi = VppPapiProvider(cls.__name__, cls, cls.vapi_response_timeout)
743 hook = hookmodule.StepHook(cls)
745 hook = hookmodule.PollHook(cls)
746 cls.vapi.register_hook(hook)
747 cls.statistics = VPPStats(socketname=cls.get_stats_sock_path())
751 cls.vpp_startup_failed = True
753 "VPP died shortly after startup, check the"
754 " output to standard error for possible cause"
759 except (vpp_papi.VPPIOError, Exception) as e:
760 cls.logger.debug("Exception connecting to vapi: %s" % e)
761 cls.vapi.disconnect()
763 if cls.debug_gdbserver:
766 "You're running VPP inside gdbserver but "
767 "VPP-API connection failed, did you forget "
768 "to 'continue' VPP from within gdb?",
774 last_line = cls.vapi.cli("show thread").split("\n")[-2]
775 cls.vpp_worker_count = int(last_line.split(" ")[0])
776 print("Detected VPP with %s workers." % cls.vpp_worker_count)
777 except vpp_papi.VPPRuntimeError as e:
778 cls.logger.debug("%s" % e)
781 except Exception as e:
782 cls.logger.debug("Exception connecting to VPP: %s" % e)
787 def _debug_quit(cls):
788 if cls.debug_gdbserver or cls.debug_gdb:
792 if cls.vpp.returncode is None:
794 print(double_line_delim)
795 print("VPP or GDB server is still running")
796 print(single_line_delim)
798 "When done debugging, press ENTER to kill the "
799 "process and finish running the testcase..."
801 except AttributeError:
807 Disconnect vpp-api, kill vpp and cleanup shared memory files
810 if hasattr(cls, "running_vpp"):
813 # first signal that we want to stop the pump thread, then wake it up
814 if hasattr(cls, "pump_thread_stop_flag"):
815 cls.pump_thread_stop_flag.set()
816 if hasattr(cls, "pump_thread_wakeup_pipe"):
817 os.write(cls.pump_thread_wakeup_pipe[1], b"ding dong wake up")
818 if hasattr(cls, "pump_thread"):
819 cls.logger.debug("Waiting for pump thread to stop")
820 cls.pump_thread.join()
821 if hasattr(cls, "vpp_stderr_reader_thread"):
822 cls.logger.debug("Waiting for stderr pump to stop")
823 cls.vpp_stderr_reader_thread.join()
825 if hasattr(cls, "vpp"):
826 if hasattr(cls, "vapi"):
827 cls.logger.debug(cls.vapi.vpp.get_stats())
828 cls.logger.debug("Disconnecting class vapi client on %s", cls.__name__)
829 cls.vapi.disconnect()
830 cls.logger.debug("Deleting class vapi attribute on %s", cls.__name__)
833 if not cls.debug_attach and cls.vpp.returncode is None:
834 cls.wait_for_coredump()
835 cls.logger.debug("Sending TERM to vpp")
837 cls.logger.debug("Waiting for vpp to die")
839 outs, errs = cls.vpp.communicate(timeout=5)
840 except subprocess.TimeoutExpired:
842 outs, errs = cls.vpp.communicate()
843 cls.logger.debug("Deleting class vpp attribute on %s", cls.__name__)
844 if not cls.debug_attach and not hasattr(cls, "running_vpp"):
845 cls.vpp.stdout.close()
846 cls.vpp.stderr.close()
847 # If vpp is a dynamic attribute set by the func use_running,
848 # deletion will result in an AttributeError that we can
852 except AttributeError:
855 if cls.vpp_startup_failed:
856 stdout_log = cls.logger.info
857 stderr_log = cls.logger.critical
859 stdout_log = cls.logger.info
860 stderr_log = cls.logger.info
862 if hasattr(cls, "vpp_stdout_deque"):
863 stdout_log(single_line_delim)
864 stdout_log("VPP output to stdout while running %s:", cls.__name__)
865 stdout_log(single_line_delim)
866 vpp_output = "".join(cls.vpp_stdout_deque)
867 with open(cls.tempdir + "/vpp_stdout.txt", "w") as f:
869 stdout_log("\n%s", vpp_output)
870 stdout_log(single_line_delim)
872 if hasattr(cls, "vpp_stderr_deque"):
873 stderr_log(single_line_delim)
874 stderr_log("VPP output to stderr while running %s:", cls.__name__)
875 stderr_log(single_line_delim)
876 vpp_output = "".join(cls.vpp_stderr_deque)
877 with open(cls.tempdir + "/vpp_stderr.txt", "w") as f:
879 stderr_log("\n%s", vpp_output)
880 stderr_log(single_line_delim)
883 def tearDownClass(cls):
884 """Perform final cleanup after running all tests in this test-case"""
885 cls.logger.debug("--- tearDownClass() for %s called ---" % cls.__name__)
886 if not hasattr(cls, "vpp"):
888 cls.reporter.send_keep_alive(cls, "tearDownClass")
890 cls.file_handler.close()
891 cls.reset_packet_infos()
892 if config.debug_framework:
893 debug_internal.on_tear_down_class(cls)
895 def show_commands_at_teardown(self):
896 """Allow subclass specific teardown logging additions."""
897 self.logger.info("--- No test specific show commands provided. ---")
900 """Show various debug prints after each test"""
902 "--- tearDown() for %s.%s(%s) called ---"
903 % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
905 if not hasattr(self, "vpp"):
909 if not self.vpp_dead:
910 self.logger.debug(self.vapi.cli("show trace max 1000"))
911 self.logger.info(self.vapi.ppcli("show interface"))
912 self.logger.info(self.vapi.ppcli("show hardware"))
913 self.logger.info(self.statistics.set_errors_str())
914 self.logger.info(self.vapi.ppcli("show run"))
915 self.logger.info(self.vapi.ppcli("show log"))
916 self.logger.info(self.vapi.ppcli("show bihash"))
917 self.logger.info("Logging testcase specific show commands.")
918 self.show_commands_at_teardown()
919 if self.remove_configured_vpp_objects_on_tear_down:
920 self.registry.remove_vpp_config(self.logger)
921 # Save/Dump VPP api trace log
922 m = self._testMethodName
923 api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
924 tmp_api_trace = "/tmp/%s" % api_trace
925 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
926 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
927 self.logger.info("Moving %s to %s\n" % (tmp_api_trace, vpp_api_trace_log))
928 shutil.move(tmp_api_trace, vpp_api_trace_log)
929 except VppTransportSocketIOError:
931 "VppTransportSocketIOError: Vpp dead. Cannot log show commands."
935 self.registry.unregister_all(self.logger)
938 """Clear trace before running each test"""
939 super(VppTestCase, self).setUp()
940 if not hasattr(self, "vpp"):
942 self.reporter.send_keep_alive(self)
946 testcase=self.__class__.__name__,
947 method_name=self._testMethodName,
949 self.sleep(0.1, "during setUp")
950 self.vpp_stdout_deque.append(
951 "--- test setUp() for %s.%s(%s) starts here ---\n"
952 % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
954 self.vpp_stderr_deque.append(
955 "--- test setUp() for %s.%s(%s) starts here ---\n"
956 % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
958 self.vapi.cli("clear trace")
959 # store the test instance inside the test class - so that objects
960 # holding the class can access instance methods (like assertEqual)
961 type(self).test_instance = self
964 def pg_enable_capture(cls, interfaces=None):
966 Enable capture on packet-generator interfaces
968 :param interfaces: iterable interface indexes (if None,
969 use self.pg_interfaces)
972 if interfaces is None:
973 interfaces = cls.pg_interfaces
978 def register_pcap(cls, intf, worker):
979 """Register a pcap in the testclass"""
980 # add to the list of captures with current timestamp
981 cls._pcaps.append((intf, worker))
984 def get_vpp_time(cls):
985 # processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT"
986 # returns float("2.190522")
987 timestr = cls.vapi.cli("show clock")
988 head, sep, tail = timestr.partition(",")
989 head, sep, tail = head.partition("Time now")
993 def sleep_on_vpp_time(cls, sec):
994 """Sleep according to time in VPP world"""
995 # On a busy system with many processes
996 # we might end up with VPP time being slower than real world
997 # So take that into account when waiting for VPP to do something
998 start_time = cls.get_vpp_time()
999 while cls.get_vpp_time() - start_time < sec:
1003 def pg_start(cls, trace=True):
1004 """Enable the PG, wait till it is done, then clean up"""
1005 for intf, worker in cls._old_pcaps:
1006 intf.handle_old_pcap_file(intf.get_in_path(worker), intf.in_history_counter)
1009 cls.vapi.cli("clear trace")
1010 cls.vapi.cli("trace add pg-input 1000")
1011 cls.vapi.cli("packet-generator enable")
1012 # PG, when starts, runs to completion -
1013 # so let's avoid a race condition,
1014 # and wait a little till it's done.
1015 # Then clean it up - and then be gone.
1016 deadline = time.time() + 300
1017 while cls.vapi.cli("show packet-generator").find("Yes") != -1:
1018 cls.sleep(0.01) # yield
1019 if time.time() > deadline:
1020 cls.logger.error("Timeout waiting for pg to stop")
1022 for intf, worker in cls._pcaps:
1023 cls.vapi.cli("packet-generator delete %s" % intf.get_cap_name(worker))
1024 cls._old_pcaps = cls._pcaps
1028 def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0, mode=None):
1030 Create packet-generator interfaces.
1032 :param interfaces: iterable indexes of the interfaces.
1033 :returns: List of created interfaces.
1037 for i in interfaces:
1038 intf = VppPGInterface(cls, i, gso, gso_size, mode)
1039 setattr(cls, intf.name, intf)
1041 cls.pg_interfaces = result
1045 def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
1046 if not hasattr(cls, "vpp"):
1047 cls.pg_interfaces = []
1048 return cls.pg_interfaces
1049 pgmode = VppEnum.vl_api_pg_interface_mode_t
1050 return cls.create_pg_interfaces_internal(
1051 interfaces, gso, gso_size, pgmode.PG_API_MODE_IP4
1055 def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
1056 if not hasattr(cls, "vpp"):
1057 cls.pg_interfaces = []
1058 return cls.pg_interfaces
1059 pgmode = VppEnum.vl_api_pg_interface_mode_t
1060 return cls.create_pg_interfaces_internal(
1061 interfaces, gso, gso_size, pgmode.PG_API_MODE_IP6
1065 def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
1066 if not hasattr(cls, "vpp"):
1067 cls.pg_interfaces = []
1068 return cls.pg_interfaces
1069 pgmode = VppEnum.vl_api_pg_interface_mode_t
1070 return cls.create_pg_interfaces_internal(
1071 interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
1075 def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
1076 if not hasattr(cls, "vpp"):
1077 cls.pg_interfaces = []
1078 return cls.pg_interfaces
1079 pgmode = VppEnum.vl_api_pg_interface_mode_t
1080 return cls.create_pg_interfaces_internal(
1081 interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
1085 def create_loopback_interfaces(cls, count):
1087 Create loopback interfaces.
1089 :param count: number of interfaces created.
1090 :returns: List of created interfaces.
1092 if not hasattr(cls, "vpp"):
1093 cls.lo_interfaces = []
1094 return cls.lo_interfaces
1095 result = [VppLoInterface(cls) for i in range(count)]
1097 setattr(cls, intf.name, intf)
1098 cls.lo_interfaces = result
1102 def create_bvi_interfaces(cls, count):
1104 Create BVI interfaces.
1106 :param count: number of interfaces created.
1107 :returns: List of created interfaces.
1109 if not hasattr(cls, "vpp"):
1110 cls.bvi_interfaces = []
1111 return cls.bvi_interfaces
1112 result = [VppBviInterface(cls) for i in range(count)]
1114 setattr(cls, intf.name, intf)
1115 cls.bvi_interfaces = result
1119 def extend_packet(packet, size, padding=" "):
1121 Extend packet to given size by padding with spaces or custom padding
1122 NOTE: Currently works only when Raw layer is present.
1124 :param packet: packet
1125 :param size: target size
1126 :param padding: padding used to extend the payload
1129 packet_len = len(packet) + 4
1130 extend = size - packet_len
1132 num = (extend // len(padding)) + 1
1133 packet[Raw].load += (padding * num)[:extend].encode("ascii")
1136 def reset_packet_infos(cls):
1137 """Reset the list of packet info objects and packet counts to zero"""
1138 cls._packet_infos = {}
1139 cls._packet_count_for_dst_if_idx = {}
1142 def create_packet_info(cls, src_if, dst_if):
1144 Create packet info object containing the source and destination indexes
1145 and add it to the testcase's packet info list
1147 :param VppInterface src_if: source interface
1148 :param VppInterface dst_if: destination interface
1150 :returns: _PacketInfo object
1153 info = _PacketInfo()
1154 info.index = len(cls._packet_infos)
1155 info.src = src_if.sw_if_index
1156 info.dst = dst_if.sw_if_index
1157 if isinstance(dst_if, VppSubInterface):
1158 dst_idx = dst_if.parent.sw_if_index
1160 dst_idx = dst_if.sw_if_index
1161 if dst_idx in cls._packet_count_for_dst_if_idx:
1162 cls._packet_count_for_dst_if_idx[dst_idx] += 1
1164 cls._packet_count_for_dst_if_idx[dst_idx] = 1
1165 cls._packet_infos[info.index] = info
1169 def info_to_payload(info):
1171 Convert _PacketInfo object to packet payload
1173 :param info: _PacketInfo object
1175 :returns: string containing serialized data from packet info
1178 # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1179 return pack("iiiih", info.index, info.src, info.dst, info.ip, info.proto)
1182 def payload_to_info(payload, payload_field="load"):
1184 Convert packet payload to _PacketInfo object
1186 :param payload: packet payload
1187 :type payload: <class 'scapy.packet.Raw'>
1188 :param payload_field: packet fieldname of payload "load" for
1189 <class 'scapy.packet.Raw'>
1190 :type payload_field: str
1191 :returns: _PacketInfo object containing de-serialized data from payload
1195 # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1196 payload_b = getattr(payload, payload_field)[:18]
1198 info = _PacketInfo()
1199 info.index, info.src, info.dst, info.ip, info.proto = unpack("iiiih", payload_b)
1201 # some SRv6 TCs depend on get an exception if bad values are detected
1202 if info.index > 0x4000:
1203 raise ValueError("Index value is invalid")
1207 def get_next_packet_info(self, info):
1209 Iterate over the packet info list stored in the testcase
1210 Start iteration with first element if info is None
1211 Continue based on index in info if info is specified
1213 :param info: info or None
1214 :returns: next info in list or None if no more infos
1219 next_index = info.index + 1
1220 if next_index == len(self._packet_infos):
1223 return self._packet_infos[next_index]
1225 def get_next_packet_info_for_interface(self, src_index, info):
1227 Search the packet info list for the next packet info with same source
1230 :param src_index: source interface index to search for
1231 :param info: packet info - where to start the search
1232 :returns: packet info or None
1236 info = self.get_next_packet_info(info)
1239 if info.src == src_index:
1242 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
1244 Search the packet info list for the next packet info with same source
1245 and destination interface indexes
1247 :param src_index: source interface index to search for
1248 :param dst_index: destination interface index to search for
1249 :param info: packet info - where to start the search
1250 :returns: packet info or None
1254 info = self.get_next_packet_info_for_interface(src_index, info)
1257 if info.dst == dst_index:
1260 def assert_equal(self, real_value, expected_value, name_or_class=None):
1261 if name_or_class is None:
1262 self.assertEqual(real_value, expected_value)
1265 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
1267 getdoc(name_or_class).strip(),
1269 str(name_or_class(real_value)),
1271 str(name_or_class(expected_value)),
1274 msg = "Invalid %s: %s does not match expected value %s" % (
1280 self.assertEqual(real_value, expected_value, msg)
1282 def assert_in_range(self, real_value, expected_min, expected_max, name=None):
1286 msg = "Invalid %s: %s out of range <%s,%s>" % (
1292 self.assertTrue(expected_min <= real_value <= expected_max, msg)
1294 def assert_packet_checksums_valid(self, packet, ignore_zero_udp_checksums=True):
1295 received = packet.__class__(scapy.compat.raw(packet))
1296 udp_layers = ["UDP", "UDPerror"]
1297 checksum_fields = ["cksum", "chksum"]
1300 temp = received.__class__(scapy.compat.raw(received))
1302 layer = temp.getlayer(counter)
1304 layer = layer.copy()
1305 layer.remove_payload()
1306 for cf in checksum_fields:
1307 if hasattr(layer, cf):
1309 ignore_zero_udp_checksums
1310 and 0 == getattr(layer, cf)
1311 and layer.name in udp_layers
1314 delattr(temp.getlayer(counter), cf)
1315 checksums.append((counter, cf))
1318 counter = counter + 1
1319 if 0 == len(checksums):
1321 temp = temp.__class__(scapy.compat.raw(temp))
1322 for layer, cf in reversed(checksums):
1323 calc_sum = getattr(temp[layer], cf)
1325 getattr(received[layer], cf),
1327 "packet checksum on layer #%d: %s" % (layer, temp[layer].name),
1330 "Checksum field `%s` on `%s` layer has correct value `%s`"
1331 % (cf, temp[layer].name, calc_sum)
1334 def assert_checksum_valid(
1338 checksum_field_names=["chksum", "cksum"],
1339 ignore_zero_checksum=False,
1341 """Check checksum of received packet on given layer"""
1342 layer_copy = received_packet[layer].copy()
1343 layer_copy.remove_payload()
1345 for f in checksum_field_names:
1346 if hasattr(layer_copy, f):
1349 if field_name is None:
1351 f"Layer `{layer}` has none of checksum fields: `{checksum_field_names}`."
1353 received_packet_checksum = getattr(received_packet[layer], field_name)
1354 if ignore_zero_checksum and 0 == received_packet_checksum:
1356 recalculated = received_packet.__class__(scapy.compat.raw(received_packet))
1357 delattr(recalculated[layer], field_name)
1358 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1360 received_packet_checksum,
1361 getattr(recalculated[layer], field_name),
1362 f"packet checksum (field: {field_name}) on layer: %s" % layer,
1365 def assert_ip_checksum_valid(self, received_packet, ignore_zero_checksum=False):
1366 self.assert_checksum_valid(
1367 received_packet, "IP", ignore_zero_checksum=ignore_zero_checksum
1370 def assert_tcp_checksum_valid(self, received_packet, ignore_zero_checksum=False):
1371 self.assert_checksum_valid(
1372 received_packet, "TCP", ignore_zero_checksum=ignore_zero_checksum
1375 def assert_udp_checksum_valid(self, received_packet, ignore_zero_checksum=True):
1376 self.assert_checksum_valid(
1377 received_packet, "UDP", ignore_zero_checksum=ignore_zero_checksum
1380 def assert_embedded_icmp_checksum_valid(self, received_packet):
1381 if received_packet.haslayer(IPerror):
1382 self.assert_checksum_valid(received_packet, "IPerror")
1383 if received_packet.haslayer(TCPerror):
1384 self.assert_checksum_valid(received_packet, "TCPerror")
1385 if received_packet.haslayer(UDPerror):
1386 self.assert_checksum_valid(
1387 received_packet, "UDPerror", ignore_zero_checksum=True
1389 if received_packet.haslayer(ICMPerror):
1390 self.assert_checksum_valid(received_packet, "ICMPerror")
1392 def assert_icmp_checksum_valid(self, received_packet):
1393 self.assert_checksum_valid(received_packet, "ICMP")
1394 self.assert_embedded_icmp_checksum_valid(received_packet)
1396 def assert_icmpv6_checksum_valid(self, pkt):
1397 if pkt.haslayer(ICMPv6DestUnreach):
1398 self.assert_checksum_valid(pkt, "ICMPv6DestUnreach")
1399 self.assert_embedded_icmp_checksum_valid(pkt)
1400 if pkt.haslayer(ICMPv6EchoRequest):
1401 self.assert_checksum_valid(pkt, "ICMPv6EchoRequest")
1402 if pkt.haslayer(ICMPv6EchoReply):
1403 self.assert_checksum_valid(pkt, "ICMPv6EchoReply")
1405 def get_counter(self, counter):
1406 if counter.startswith("/"):
1407 counter_value = self.statistics.get_counter(counter)
1409 counters = self.vapi.cli("sh errors").split("\n")
1411 for i in range(1, len(counters) - 1):
1412 results = counters[i].split()
1413 if results[1] == counter:
1414 counter_value = int(results[0])
1416 return counter_value
1418 def assert_counter_equal(self, counter, expected_value, thread=None, index=0):
1419 c = self.get_counter(counter)
1420 if thread is not None:
1421 c = c[thread][index]
1423 c = sum(x[index] for x in c)
1424 self.assert_equal(c, expected_value, "counter `%s'" % counter)
1426 def assert_packet_counter_equal(self, counter, expected_value):
1427 counter_value = self.get_counter(counter)
1429 counter_value, expected_value, "packet counter `%s'" % counter
1432 def assert_error_counter_equal(self, counter, expected_value):
1433 counter_value = self.statistics[counter].sum()
1434 self.assert_equal(counter_value, expected_value, "error counter `%s'" % counter)
1437 def sleep(cls, timeout, remark=None):
1438 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1439 # * by Guido, only the main thread can be interrupted.
1441 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1444 if hasattr(os, "sched_yield"):
1450 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1451 before = time.time()
1454 if after - before > 2 * timeout:
1456 "unexpected self.sleep() result - slept for %es instead of ~%es!",
1462 "Finished sleep (%s) - slept %es (wanted %es)",
1468 def virtual_sleep(self, timeout, remark=None):
1469 self.logger.debug("Moving VPP time by %s (%s)", timeout, remark)
1470 self.vapi.cli("set clock adjust %s" % timeout)
1472 def pg_send(self, intf, pkts, worker=None, trace=True):
1473 intf.add_stream(pkts, worker=worker)
1474 self.pg_enable_capture(self.pg_interfaces)
1475 self.pg_start(trace=trace)
1477 def snapshot_stats(self, stats_diff):
1478 """Return snapshot of interesting stats based on diff dictionary."""
1480 for sw_if_index in stats_diff:
1481 for counter in stats_diff[sw_if_index]:
1482 stats_snapshot[counter] = self.statistics[counter]
1483 self.logger.debug(f"Took statistics stats_snapshot: {stats_snapshot}")
1484 return stats_snapshot
1486 def compare_stats_with_snapshot(self, stats_diff, stats_snapshot):
1487 """Assert appropriate difference between current stats and snapshot."""
1488 for sw_if_index in stats_diff:
1489 for cntr, diff in stats_diff[sw_if_index].items():
1490 if sw_if_index == "err":
1492 self.statistics[cntr].sum(),
1493 stats_snapshot[cntr].sum() + diff,
1494 f"'{cntr}' counter value (previous value: "
1495 f"{stats_snapshot[cntr].sum()}, "
1496 f"expected diff: {diff})",
1501 self.statistics[cntr][:, sw_if_index].sum(),
1502 stats_snapshot[cntr][:, sw_if_index].sum() + diff,
1503 f"'{cntr}' counter value (previous value: "
1504 f"{stats_snapshot[cntr][:, sw_if_index].sum()}, "
1505 f"expected diff: {diff})",
1507 except IndexError as e:
1508 # if diff is 0, then this most probably a case where
1509 # test declares multiple interfaces but traffic hasn't
1510 # passed through this one yet - which means the counter
1511 # value is 0 and can be ignored
1514 f"Couldn't sum counter: {cntr} on sw_if_index: {sw_if_index}"
1517 def send_and_assert_no_replies(
1518 self, intf, pkts, remark="", timeout=None, stats_diff=None, trace=True, msg=None
1521 stats_snapshot = self.snapshot_stats(stats_diff)
1523 self.pg_send(intf, pkts)
1528 for i in self.pg_interfaces:
1529 i.assert_nothing_captured(timeout=timeout, remark=remark)
1534 self.logger.debug(f"send_and_assert_no_replies: {msg}")
1535 self.logger.debug(self.vapi.cli("show trace"))
1538 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1540 def send_and_expect(
1552 stats_snapshot = self.snapshot_stats(stats_diff)
1555 n_rx = 1 if isinstance(pkts, Packet) else len(pkts)
1556 self.pg_send(intf, pkts, worker=worker, trace=trace)
1557 rx = output.get_capture(n_rx)
1560 self.logger.debug(f"send_and_expect: {msg}")
1561 self.logger.debug(self.vapi.cli("show trace"))
1564 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1568 def send_and_expect_load_balancing(
1569 self, input, pkts, outputs, worker=None, trace=True
1571 self.pg_send(input, pkts, worker=worker, trace=trace)
1574 rx = oo._get_capture(1)
1575 self.assertNotEqual(0, len(rx))
1578 self.logger.debug(self.vapi.cli("show trace"))
1581 def send_and_expect_some(self, intf, pkts, output, worker=None, trace=True):
1582 self.pg_send(intf, pkts, worker=worker, trace=trace)
1583 rx = output._get_capture(1)
1585 self.logger.debug(self.vapi.cli("show trace"))
1586 self.assertTrue(len(rx) > 0)
1587 self.assertTrue(len(rx) < len(pkts))
1590 def send_and_expect_only(self, intf, pkts, output, timeout=None, stats_diff=None):
1592 stats_snapshot = self.snapshot_stats(stats_diff)
1594 self.pg_send(intf, pkts)
1595 rx = output.get_capture(len(pkts))
1599 for i in self.pg_interfaces:
1600 if i not in outputs:
1601 i.assert_nothing_captured(timeout=timeout)
1605 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1610 def get_testcase_doc_name(test):
1611 return getdoc(test.__class__).splitlines()[0]
1614 def get_test_description(descriptions, test):
1615 short_description = test.shortDescription()
1616 if descriptions and short_description:
1617 return short_description
1622 class TestCaseInfo(object):
1623 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1624 self.logger = logger
1625 self.tempdir = tempdir
1626 self.vpp_pid = vpp_pid
1627 self.vpp_bin_path = vpp_bin_path
1628 self.core_crash_test = None
1631 class VppTestResult(unittest.TestResult):
1633 @property result_string
1634 String variable to store the test case result string.
1636 List variable containing 2-tuples of TestCase instances and strings
1637 holding formatted tracebacks. Each tuple represents a test which
1638 raised an unexpected exception.
1640 List variable containing 2-tuples of TestCase instances and strings
1641 holding formatted tracebacks. Each tuple represents a test where
1642 a failure was explicitly signalled using the TestCase.assert*()
1646 failed_test_cases_info = set()
1647 core_crash_test_cases_info = set()
1648 current_test_case_info = None
1650 def __init__(self, stream=None, descriptions=None, verbosity=None, runner=None):
1652 :param stream File descriptor to store where to report test results.
1653 Set to the standard error stream by default.
1654 :param descriptions Boolean variable to store information if to use
1655 test case descriptions.
1656 :param verbosity Integer variable to store required verbosity level.
1658 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1659 self.stream = stream
1660 self.descriptions = descriptions
1661 self.verbosity = verbosity
1662 self.result_code = TestResultCode.TEST_RUN
1663 self.result_string = None
1664 self.runner = runner
1667 def addSuccess(self, test):
1669 Record a test succeeded result
1674 self.log_result("addSuccess", test)
1675 unittest.TestResult.addSuccess(self, test)
1676 self.result_string = colorize("OK", GREEN)
1677 self.result_code = TestResultCode.PASS
1678 self.send_result_through_pipe(test, self.result_code)
1680 def addExpectedFailure(self, test, err):
1681 self.log_result("addExpectedFailure", test, err)
1682 super().addExpectedFailure(test, err)
1683 self.result_string = colorize("FAIL", GREEN)
1684 self.result_code = TestResultCode.EXPECTED_FAIL
1685 self.send_result_through_pipe(test, self.result_code)
1687 def addUnexpectedSuccess(self, test):
1688 self.log_result("addUnexpectedSuccess", test)
1689 super().addUnexpectedSuccess(test)
1690 self.result_string = colorize("OK", RED)
1691 self.result_code = TestResultCode.UNEXPECTED_PASS
1692 self.send_result_through_pipe(test, self.result_code)
1694 def addSkip(self, test, reason):
1696 Record a test skipped.
1702 self.log_result("addSkip", test, reason=reason)
1703 unittest.TestResult.addSkip(self, test, reason)
1704 self.result_string = colorize("SKIP", YELLOW)
1706 if reason == "not enough cpus":
1707 self.result_code = TestResultCode.SKIP_CPU_SHORTAGE
1709 self.result_code = TestResultCode.SKIP
1710 self.send_result_through_pipe(test, self.result_code)
1712 def symlink_failed(self):
1713 if self.current_test_case_info:
1715 failed_dir = config.failed_dir
1716 link_path = os.path.join(
1718 "%s-FAILED" % os.path.basename(self.current_test_case_info.tempdir),
1721 self.current_test_case_info.logger.debug(
1722 "creating a link to the failed test"
1724 self.current_test_case_info.logger.debug(
1725 "os.symlink(%s, %s)"
1726 % (self.current_test_case_info.tempdir, link_path)
1728 if os.path.exists(link_path):
1729 self.current_test_case_info.logger.debug("symlink already exists")
1731 os.symlink(self.current_test_case_info.tempdir, link_path)
1733 except Exception as e:
1734 self.current_test_case_info.logger.error(e)
1736 def send_result_through_pipe(self, test, result):
1737 if hasattr(self, "test_framework_result_pipe"):
1738 pipe = self.test_framework_result_pipe
1740 pipe.send((test.id(), result))
1742 def log_result(self, fn, test, err=None, reason=None):
1743 if self.current_test_case_info:
1744 if isinstance(test, unittest.suite._ErrorHolder):
1745 test_name = test.description
1747 test_name = "%s.%s(%s)" % (
1748 test.__class__.__name__,
1749 test._testMethodName,
1750 test._testMethodDoc,
1754 extra_msg += f", error is {err}"
1756 extra_msg += f", reason is {reason}"
1757 self.current_test_case_info.logger.debug(
1758 f"--- {fn}() {test_name} called{extra_msg}"
1761 self.current_test_case_info.logger.debug(
1762 "formatted exception is:\n%s" % "".join(format_exception(*err))
1765 def add_error(self, test, err, unittest_fn, result_code):
1766 self.result_code = result_code
1767 if result_code == TestResultCode.FAIL:
1768 self.log_result("addFailure", test, err=err)
1769 error_type_str = colorize("FAIL", RED)
1770 elif result_code == TestResultCode.ERROR:
1771 self.log_result("addError", test, err=err)
1772 error_type_str = colorize("ERROR", RED)
1774 raise Exception(f"Unexpected result code {result_code}")
1776 unittest_fn(self, test, err)
1777 if self.current_test_case_info:
1778 self.result_string = "%s [ temp dir used by test case: %s ]" % (
1780 self.current_test_case_info.tempdir,
1782 self.symlink_failed()
1783 self.failed_test_cases_info.add(self.current_test_case_info)
1784 if is_core_present(self.current_test_case_info.tempdir):
1785 if not self.current_test_case_info.core_crash_test:
1786 if isinstance(test, unittest.suite._ErrorHolder):
1787 test_name = str(test)
1789 test_name = "'{!s}' ({!s})".format(
1790 get_testcase_doc_name(test), test.id()
1792 self.current_test_case_info.core_crash_test = test_name
1793 self.core_crash_test_cases_info.add(self.current_test_case_info)
1795 self.result_string = "%s [no temp dir]" % error_type_str
1797 self.send_result_through_pipe(test, result_code)
1799 def addFailure(self, test, err):
1801 Record a test failed result
1804 :param err: error message
1807 self.add_error(test, err, unittest.TestResult.addFailure, TestResultCode.FAIL)
1809 def addError(self, test, err):
1811 Record a test error result
1814 :param err: error message
1817 self.add_error(test, err, unittest.TestResult.addError, TestResultCode.ERROR)
1819 def getDescription(self, test):
1821 Get test description
1824 :returns: test description
1827 return get_test_description(self.descriptions, test)
1829 def startTest(self, test):
1837 def print_header(test):
1838 if test.__class__ in self.printed:
1841 test_doc = getdoc(test)
1843 raise Exception("No doc string for test '%s'" % test.id())
1845 test_title = test_doc.splitlines()[0].rstrip()
1846 test_title = colorize(test_title, GREEN)
1847 if test.is_tagged_run_solo():
1848 test_title = colorize(f"SOLO RUN: {test_title}", YELLOW)
1850 # This block may overwrite the colorized title above,
1851 # but we want this to stand out and be fixed
1852 if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
1853 test_title = colorize(f"FIXME with VPP workers: {test_title}", RED)
1855 if test.has_tag(TestCaseTag.FIXME_ASAN):
1856 test_title = colorize(f"FIXME with ASAN: {test_title}", RED)
1857 test.skip_fixme_asan()
1859 if is_distro_ubuntu2204 == True and test.has_tag(
1860 TestCaseTag.FIXME_UBUNTU2204
1862 test_title = colorize(f"FIXME on Ubuntu-22.04: {test_title}", RED)
1863 test.skip_fixme_ubuntu2204()
1865 if is_distro_debian11 == True and test.has_tag(TestCaseTag.FIXME_DEBIAN11):
1866 test_title = colorize(f"FIXME on Debian-11: {test_title}", RED)
1867 test.skip_fixme_debian11()
1869 if "debug" in config.vpp_tag and test.has_tag(TestCaseTag.FIXME_VPP_DEBUG):
1870 test_title = colorize(f"FIXME on VPP Debug: {test_title}", RED)
1871 test.skip_fixme_vpp_debug()
1873 if hasattr(test, "vpp_worker_count"):
1874 if test.vpp_worker_count == 0:
1875 test_title += " [main thread only]"
1876 elif test.vpp_worker_count == 1:
1877 test_title += " [1 worker thread]"
1879 test_title += f" [{test.vpp_worker_count} worker threads]"
1881 if test.__class__.skipped_due_to_cpu_lack:
1882 test_title = colorize(
1883 f"{test_title} [skipped - not enough cpus, "
1884 f"required={test.__class__.get_cpus_required()}, "
1885 f"available={max_vpp_cpus}]",
1889 print(double_line_delim)
1891 print(double_line_delim)
1892 self.printed.append(test.__class__)
1895 self.start_test = time.time()
1896 unittest.TestResult.startTest(self, test)
1897 if self.verbosity > 0:
1898 self.stream.writeln("Starting " + self.getDescription(test) + " ...")
1899 self.stream.writeln(single_line_delim)
1901 def stopTest(self, test):
1903 Called when the given test has been run
1908 unittest.TestResult.stopTest(self, test)
1910 result_code_to_suffix = {
1911 TestResultCode.PASS: "",
1912 TestResultCode.FAIL: "",
1913 TestResultCode.ERROR: "",
1914 TestResultCode.SKIP: "",
1915 TestResultCode.TEST_RUN: "",
1916 TestResultCode.SKIP_CPU_SHORTAGE: "",
1917 TestResultCode.EXPECTED_FAIL: " [EXPECTED FAIL]",
1918 TestResultCode.UNEXPECTED_PASS: " [UNEXPECTED PASS]",
1921 if self.verbosity > 0:
1922 self.stream.writeln(single_line_delim)
1923 self.stream.writeln(
1926 self.getDescription(test),
1928 result_code_to_suffix[self.result_code],
1931 self.stream.writeln(single_line_delim)
1933 self.stream.writeln(
1936 self.getDescription(test),
1937 time.time() - self.start_test,
1939 result_code_to_suffix[self.result_code],
1943 self.send_result_through_pipe(test, TestResultCode.TEST_RUN)
1945 def printErrors(self):
1947 Print errors from running the test case
1949 if len(self.errors) > 0 or len(self.failures) > 0:
1950 self.stream.writeln()
1951 self.printErrorList("ERROR", self.errors)
1952 self.printErrorList("FAIL", self.failures)
1954 # ^^ that is the last output from unittest before summary
1955 if not self.runner.print_summary:
1956 devnull = unittest.runner._WritelnDecorator(open(os.devnull, "w"))
1957 self.stream = devnull
1958 self.runner.stream = devnull
1960 def printErrorList(self, flavour, errors):
1962 Print error list to the output stream together with error type
1963 and test case description.
1965 :param flavour: error type
1966 :param errors: iterable errors
1969 for test, err in errors:
1970 self.stream.writeln(double_line_delim)
1971 self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
1972 self.stream.writeln(single_line_delim)
1973 self.stream.writeln("%s" % err)
1976 class VppTestRunner(unittest.TextTestRunner):
1978 A basic test runner implementation which prints results to standard error.
1982 def resultclass(self):
1983 """Class maintaining the results of the tests"""
1984 return VppTestResult
1988 keep_alive_pipe=None,
1998 # ignore stream setting here, use hard-coded stdout to be in sync
1999 # with prints from VppTestCase methods ...
2000 super(VppTestRunner, self).__init__(
2001 sys.stdout, descriptions, verbosity, failfast, buffer, resultclass, **kwargs
2003 KeepAliveReporter.pipe = keep_alive_pipe
2005 self.orig_stream = self.stream
2006 self.resultclass.test_framework_result_pipe = result_pipe
2008 self.print_summary = print_summary
2010 def _makeResult(self):
2011 return self.resultclass(self.stream, self.descriptions, self.verbosity, self)
2013 def run(self, test):
2020 faulthandler.enable() # emit stack trace to stderr if killed by signal
2022 result = super(VppTestRunner, self).run(test)
2023 if not self.print_summary:
2024 self.stream = self.orig_stream
2025 result.stream = self.orig_stream
2029 class Worker(Thread):
2030 def __init__(self, executable_args, logger, env=None, *args, **kwargs):
2031 super(Worker, self).__init__(*args, **kwargs)
2032 self.logger = logger
2033 self.args = executable_args
2034 if hasattr(self, "testcase") and self.testcase.debug_all:
2035 if self.testcase.debug_gdbserver:
2037 "/usr/bin/gdbserver",
2038 "localhost:{port}".format(port=self.testcase.gdbserver_port),
2040 elif self.testcase.debug_gdb and hasattr(self, "wait_for_gdb"):
2041 self.args.append(self.wait_for_gdb)
2042 self.app_bin = executable_args[0]
2043 self.app_name = os.path.basename(self.app_bin)
2044 if hasattr(self, "role"):
2045 self.app_name += " {role}".format(role=self.role)
2048 env = {} if env is None else env
2049 self.env = copy.deepcopy(env)
2051 def wait_for_enter(self):
2052 if not hasattr(self, "testcase"):
2054 if self.testcase.debug_all and self.testcase.debug_gdbserver:
2056 print(double_line_delim)
2058 "Spawned GDB Server for '{app}' with PID: {pid}".format(
2059 app=self.app_name, pid=self.process.pid
2062 elif self.testcase.debug_all and self.testcase.debug_gdb:
2064 print(double_line_delim)
2066 "Spawned '{app}' with PID: {pid}".format(
2067 app=self.app_name, pid=self.process.pid
2072 print(single_line_delim)
2073 print("You can debug '{app}' using:".format(app=self.app_name))
2074 if self.testcase.debug_gdbserver:
2078 + " -ex 'target remote localhost:{port}'".format(
2079 port=self.testcase.gdbserver_port
2083 "Now is the time to attach gdb by running the above "
2084 "command, set up breakpoints etc., then resume from "
2085 "within gdb by issuing the 'continue' command"
2087 self.testcase.gdbserver_port += 1
2088 elif self.testcase.debug_gdb:
2092 + " -ex 'attach {pid}'".format(pid=self.process.pid)
2095 "Now is the time to attach gdb by running the above "
2096 "command and set up breakpoints etc., then resume from"
2097 " within gdb by issuing the 'continue' command"
2099 print(single_line_delim)
2100 input("Press ENTER to continue running the testcase...")
2103 executable = self.args[0]
2104 if not os.path.exists(executable) or not os.access(
2105 executable, os.F_OK | os.X_OK
2107 # Exit code that means some system file did not exist,
2108 # could not be opened, or had some other kind of error.
2109 self.result = os.EX_OSFILE
2110 raise EnvironmentError(
2111 "executable '%s' is not found or executable." % executable
2114 "Running executable '{app}': '{cmd}'".format(
2115 app=self.app_name, cmd=" ".join(self.args)
2118 env = os.environ.copy()
2119 env.update(self.env)
2120 env["CK_LOG_FILE_NAME"] = "-"
2121 self.process = subprocess.Popen(
2122 ["stdbuf", "-o0", "-e0"] + self.args,
2125 preexec_fn=os.setpgrp,
2126 stdout=subprocess.PIPE,
2127 stderr=subprocess.PIPE,
2129 self.wait_for_enter()
2130 out, err = self.process.communicate()
2131 self.logger.debug("Finished running `{app}'".format(app=self.app_name))
2132 self.logger.info("Return code is `%s'" % self.process.returncode)
2133 self.logger.info(single_line_delim)
2135 "Executable `{app}' wrote to stdout:".format(app=self.app_name)
2137 self.logger.info(single_line_delim)
2138 self.logger.info(out.decode("utf-8"))
2139 self.logger.info(single_line_delim)
2141 "Executable `{app}' wrote to stderr:".format(app=self.app_name)
2143 self.logger.info(single_line_delim)
2144 self.logger.info(err.decode("utf-8"))
2145 self.logger.info(single_line_delim)
2146 self.result = self.process.returncode
2149 if __name__ == "__main__":