3 from __future__ import print_function
18 from collections import deque
19 from threading import Thread, Event
20 from inspect import getdoc, isclass
21 from traceback import format_exception
22 from logging import FileHandler, DEBUG, Formatter
24 from abc import ABC, abstractmethod
25 from struct import pack, unpack
28 from scapy.packet import Raw, Packet
29 from config import config, available_cpus, num_cpus, max_vpp_cpus
30 import hook as hookmodule
31 from vpp_pg_interface import VppPGInterface
32 from vpp_sub_interface import VppSubInterface
33 from vpp_lo_interface import VppLoInterface
34 from vpp_bvi_interface import VppBviInterface
35 from vpp_papi_provider import VppPapiProvider
36 from vpp_papi import VppEnum
38 from vpp_papi.vpp_stats import VPPStats
39 from vpp_papi.vpp_transport_socket import VppTransportSocketIOError
49 from vpp_object import VppObjectRegistry
50 from util import ppp, is_core_present
51 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
52 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
53 from scapy.layers.inet6 import ICMPv6EchoReply
54 from vpp_running import use_running
55 from test_result_code import TestResultCode
58 logger = logging.getLogger(__name__)
60 # Set up an empty logger for the testcase that can be overridden as necessary
61 null_logger = logging.getLogger("VppTestCase")
62 null_logger.addHandler(logging.NullHandler())
65 if config.debug_framework:
69 Test framework module.
71 The module provides a set of tools for constructing and running tests and
72 representing the results.
76 class VppDiedError(Exception):
77 """exception for reporting that the subprocess has died."""
81 for k, v in signal.__dict__.items()
82 if k.startswith("SIG") and not k.startswith("SIG_")
85 def __init__(self, rv=None, testcase=None, method_name=None):
87 self.signal_name = None
88 self.testcase = testcase
89 self.method_name = method_name
92 self.signal_name = VppDiedError.signals_by_value[-rv]
93 except (KeyError, TypeError):
96 if testcase is None and method_name is None:
99 in_msg = " while running %s.%s" % (testcase, method_name)
102 msg = "VPP subprocess died unexpectedly%s with return code: %d%s." % (
105 " [%s]" % (self.signal_name if self.signal_name is not None else ""),
108 msg = "VPP subprocess died unexpectedly%s." % in_msg
110 super(VppDiedError, self).__init__(msg)
113 class _PacketInfo(object):
114 """Private class to create packet info object.
116 Help process information about the next packet.
117 Set variables to default values.
120 #: Store the index of the packet.
122 #: Store the index of the source packet generator interface of the packet.
124 #: Store the index of the destination packet generator interface
127 #: Store expected ip version
129 #: Store expected upper protocol
131 #: Store the copy of the former packet.
134 def __eq__(self, other):
135 index = self.index == other.index
136 src = self.src == other.src
137 dst = self.dst == other.dst
138 data = self.data == other.data
139 return index and src and dst and data
142 def pump_output(testclass):
143 """pump output from vpp stdout/stderr to proper queues"""
144 if not hasattr(testclass, "vpp"):
148 while not testclass.pump_thread_stop_flag.is_set():
149 readable = select.select(
151 testclass.vpp.stdout.fileno(),
152 testclass.vpp.stderr.fileno(),
153 testclass.pump_thread_wakeup_pipe[0],
158 if testclass.vpp.stdout.fileno() in readable:
159 read = os.read(testclass.vpp.stdout.fileno(), 102400)
161 split = read.decode("ascii", errors="backslashreplace").splitlines(True)
162 if len(stdout_fragment) > 0:
163 split[0] = "%s%s" % (stdout_fragment, split[0])
164 if len(split) > 0 and split[-1].endswith("\n"):
168 stdout_fragment = split[-1]
169 testclass.vpp_stdout_deque.extend(split[:limit])
170 if not config.cache_vpp_output:
171 for line in split[:limit]:
172 testclass.logger.info("VPP STDOUT: %s" % line.rstrip("\n"))
173 if testclass.vpp.stderr.fileno() in readable:
174 read = os.read(testclass.vpp.stderr.fileno(), 102400)
176 split = read.decode("ascii", errors="backslashreplace").splitlines(True)
177 if len(stderr_fragment) > 0:
178 split[0] = "%s%s" % (stderr_fragment, split[0])
179 if len(split) > 0 and split[-1].endswith("\n"):
183 stderr_fragment = split[-1]
185 testclass.vpp_stderr_deque.extend(split[:limit])
186 if not config.cache_vpp_output:
187 for line in split[:limit]:
188 testclass.logger.error("VPP STDERR: %s" % line.rstrip("\n"))
189 # ignoring the dummy pipe here intentionally - the
190 # flag will take care of properly terminating the loop
193 def _is_platform_aarch64():
194 return platform.machine() == "aarch64"
197 is_platform_aarch64 = _is_platform_aarch64()
200 def _is_distro_ubuntu2204():
201 with open("/etc/os-release") as f:
202 for line in f.readlines():
208 is_distro_ubuntu2204 = _is_distro_ubuntu2204()
211 def _is_distro_debian11():
212 with open("/etc/os-release") as f:
213 for line in f.readlines():
214 if "bullseye" in line:
219 is_distro_debian11 = _is_distro_debian11()
222 class KeepAliveReporter(object):
224 Singleton object which reports test start to parent process
230 self.__dict__ = self._shared_state
238 def pipe(self, pipe):
239 if self._pipe is not None:
240 raise Exception("Internal error - pipe should only be set once.")
243 def send_keep_alive(self, test, desc=None):
245 Write current test tmpdir & desc to keep-alive pipe to signal liveness
247 if not hasattr(test, "vpp") or self.pipe is None:
248 # if not running forked..
252 desc = "%s (%s)" % (desc, unittest.util.strclass(test))
256 self.pipe.send((desc, config.vpp, test.tempdir, test.vpp.pid))
259 class TestCaseTag(Enum):
260 # marks the suites that must run at the end
261 # using only a single test runner
263 # marks the suites broken on VPP multi-worker
264 FIXME_VPP_WORKERS = 2
265 # marks the suites broken when ASan is enabled
267 # marks suites broken on Ubuntu-22.04
269 # marks suites broken on Debian-11
271 # marks suites broken on debug vpp image
275 def create_tag_decorator(e):
278 cls.test_tags.append(e)
279 except AttributeError:
286 tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
287 tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
288 tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN)
289 tag_fixme_ubuntu2204 = create_tag_decorator(TestCaseTag.FIXME_UBUNTU2204)
290 tag_fixme_debian11 = create_tag_decorator(TestCaseTag.FIXME_DEBIAN11)
291 tag_fixme_vpp_debug = create_tag_decorator(TestCaseTag.FIXME_VPP_DEBUG)
305 class CPUInterface(ABC):
307 skipped_due_to_cpu_lack = False
311 def get_cpus_required(cls):
315 def assign_cpus(cls, cpus):
320 class VppTestCase(CPUInterface, unittest.TestCase):
321 """This subclass is a base class for VPP test cases that are implemented as
322 classes. It provides methods to create and run test case.
325 extra_vpp_statseg_config = ""
326 extra_vpp_config = []
327 extra_vpp_plugin_config = []
329 vapi_response_timeout = 5
330 remove_configured_vpp_objects_on_tear_down = True
333 def packet_infos(self):
334 """List of packet infos"""
335 return self._packet_infos
338 def get_packet_count_for_if_idx(cls, dst_if_index):
339 """Get the number of packet info for specified destination if index"""
340 if dst_if_index in cls._packet_count_for_dst_if_idx:
341 return cls._packet_count_for_dst_if_idx[dst_if_index]
346 def has_tag(cls, tag):
347 """if the test case has a given tag - return true"""
349 return tag in cls.test_tags
350 except AttributeError:
355 def is_tagged_run_solo(cls):
356 """if the test case class is timing-sensitive - return true"""
357 return cls.has_tag(TestCaseTag.RUN_SOLO)
360 def skip_fixme_asan(cls):
361 """if @tag_fixme_asan & ASan is enabled - mark for skip"""
362 if cls.has_tag(TestCaseTag.FIXME_ASAN):
363 vpp_extra_cmake_args = os.environ.get("VPP_EXTRA_CMAKE_ARGS", "")
364 if "DVPP_ENABLE_SANITIZE_ADDR=ON" in vpp_extra_cmake_args:
365 cls = unittest.skip("Skipping @tag_fixme_asan tests")(cls)
368 def skip_fixme_ubuntu2204(cls):
369 """if distro is ubuntu 22.04 and @tag_fixme_ubuntu2204 mark for skip"""
370 if cls.has_tag(TestCaseTag.FIXME_UBUNTU2204):
371 cls = unittest.skip("Skipping @tag_fixme_ubuntu2204 tests")(cls)
374 def skip_fixme_debian11(cls):
375 """if distro is Debian-11 and @tag_fixme_debian11 mark for skip"""
376 if cls.has_tag(TestCaseTag.FIXME_DEBIAN11):
377 cls = unittest.skip("Skipping @tag_fixme_debian11 tests")(cls)
380 def skip_fixme_vpp_debug(cls):
381 cls = unittest.skip("Skipping @tag_fixme_vpp_debug tests")(cls)
385 """Return the instance of this testcase"""
386 return cls.test_instance
389 def set_debug_flags(cls, d):
390 cls.gdbserver_port = 7777
391 cls.debug_core = False
392 cls.debug_gdb = False
393 cls.debug_gdbserver = False
394 cls.debug_all = False
395 cls.debug_attach = False
400 cls.debug_core = True
401 elif dl == "gdb" or dl == "gdb-all":
403 elif dl == "gdbserver" or dl == "gdbserver-all":
404 cls.debug_gdbserver = True
406 cls.debug_attach = True
408 raise Exception("Unrecognized DEBUG option: '%s'" % d)
409 if dl == "gdb-all" or dl == "gdbserver-all":
413 def get_vpp_worker_count(cls):
414 if not hasattr(cls, "vpp_worker_count"):
415 if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
416 cls.vpp_worker_count = 0
418 cls.vpp_worker_count = config.vpp_worker_count
419 return cls.vpp_worker_count
422 def get_cpus_required(cls):
423 return 1 + cls.get_vpp_worker_count()
426 def setUpConstants(cls):
427 """Set-up the test case class based on environment variables"""
428 cls.step = config.step
429 cls.plugin_path = ":".join(config.vpp_plugin_dir)
430 cls.test_plugin_path = ":".join(config.vpp_test_plugin_dir)
431 cls.extern_plugin_path = ":".join(config.extern_plugin_dir)
433 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
434 debug_cli = "cli-listen localhost:5002"
435 size = re.search(r"\d+[gG]", config.coredump_size)
437 coredump_size = f"coredump-size {config.coredump_size}".lower()
439 coredump_size = "coredump-size unlimited"
440 default_variant = config.variant
441 if default_variant is not None:
442 default_variant = "default { variant %s 100 }" % default_variant
446 api_fuzzing = config.api_fuzz
447 if api_fuzzing is None:
468 cls.get_api_segment_prefix(),
475 if cls.extern_plugin_path not in (None, ""):
476 cls.extra_vpp_plugin_config.append("add-path %s" % cls.extern_plugin_path)
477 if cls.get_vpp_worker_count():
478 cls.vpp_cmdline.extend(
479 ["corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])]
481 cls.vpp_cmdline.extend(
492 cls.get_stats_sock_path(),
493 cls.extra_vpp_statseg_config,
498 cls.get_api_sock_path(),
519 "lisp_unittest_plugin.so",
524 "unittest_plugin.so",
529 + cls.extra_vpp_plugin_config
535 if cls.extra_vpp_config is not None:
536 cls.vpp_cmdline.extend(cls.extra_vpp_config)
538 if not cls.debug_attach:
539 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
540 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
543 def wait_for_enter(cls):
544 if cls.debug_gdbserver:
545 print(double_line_delim)
546 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
548 print(double_line_delim)
549 print("Spawned VPP with PID: %d" % cls.vpp.pid)
551 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
553 print(single_line_delim)
554 print("You can debug VPP using:")
555 if cls.debug_gdbserver:
557 f"sudo gdb {config.vpp} "
558 f"-ex 'target remote localhost:{cls.gdbserver_port}'"
561 "Now is the time to attach gdb by running the above "
562 "command, set up breakpoints etc., then resume VPP from "
563 "within gdb by issuing the 'continue' command"
565 cls.gdbserver_port += 1
567 print(f"sudo gdb {config.vpp} -ex 'attach {cls.vpp.pid}'")
569 "Now is the time to attach gdb by running the above "
570 "command and set up breakpoints etc., then resume VPP from"
571 " within gdb by issuing the 'continue' command"
573 print(single_line_delim)
574 input("Press ENTER to continue running the testcase...")
583 is_distro_ubuntu2204 == True and cls.has_tag(TestCaseTag.FIXME_UBUNTU2204)
584 ) or (is_distro_debian11 == True and cls.has_tag(TestCaseTag.FIXME_DEBIAN11)):
586 cls.logger.debug(f"Assigned cpus: {cls.cpus}")
587 cmdline = cls.vpp_cmdline
589 if cls.debug_gdbserver:
590 gdbserver = "/usr/bin/gdbserver"
591 if not os.path.isfile(gdbserver) or not os.access(gdbserver, os.X_OK):
593 "gdbserver binary '%s' does not exist or is "
594 "not executable" % gdbserver
599 "localhost:{port}".format(port=cls.gdbserver_port),
601 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
604 cls.vpp = subprocess.Popen(
605 cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE
607 except subprocess.CalledProcessError as e:
609 "Subprocess returned with non-0 return code: (%s)", e.returncode
614 "Subprocess returned with OS error: (%s) %s", e.errno, e.strerror
617 except Exception as e:
618 cls.logger.exception("Subprocess returned unexpected from %s:", cmdline)
624 def wait_for_coredump(cls):
625 corefile = cls.tempdir + "/core"
626 if os.path.isfile(corefile):
627 cls.logger.error("Waiting for coredump to complete: %s", corefile)
628 curr_size = os.path.getsize(corefile)
629 deadline = time.time() + 60
631 while time.time() < deadline:
634 curr_size = os.path.getsize(corefile)
635 if size == curr_size:
640 "Timed out waiting for coredump to complete: %s", corefile
643 cls.logger.error("Coredump complete: %s, size %d", corefile, curr_size)
646 def get_stats_sock_path(cls):
647 return "%s/stats.sock" % cls.tempdir
650 def get_api_sock_path(cls):
651 return "%s/api.sock" % cls.tempdir
654 def get_api_segment_prefix(cls):
655 return os.path.basename(cls.tempdir) # Only used for VAPI
658 def get_tempdir(cls):
660 tmpdir = f"{config.tmp_dir}/unittest-attach-gdb"
662 tmpdir = f"{config.tmp_dir}/vpp-unittest-{cls.__name__}"
663 if config.wipe_tmp_dir:
664 shutil.rmtree(tmpdir, ignore_errors=True)
669 def create_file_handler(cls):
670 if config.log_dir is None:
671 cls.file_handler = FileHandler(f"{cls.tempdir}/log.txt")
674 logdir = f"{config.log_dir}/vpp-unittest-{cls.__name__}"
675 if config.wipe_tmp_dir:
676 shutil.rmtree(logdir, ignore_errors=True)
678 cls.file_handler = FileHandler(f"{logdir}/log.txt")
683 Perform class setup before running the testcase
684 Remove shared memory files, start vpp and connect the vpp-api
686 super(VppTestCase, cls).setUpClass()
687 cls.logger = get_logger(cls.__name__)
688 random.seed(config.rnd_seed)
689 if hasattr(cls, "parallel_handler"):
690 cls.logger.addHandler(cls.parallel_handler)
691 cls.logger.propagate = False
692 cls.set_debug_flags(config.debug)
693 cls.tempdir = cls.get_tempdir()
694 cls.create_file_handler()
695 cls.file_handler.setFormatter(
696 Formatter(fmt="%(asctime)s,%(msecs)03d %(message)s", datefmt="%H:%M:%S")
698 cls.file_handler.setLevel(DEBUG)
699 cls.logger.addHandler(cls.file_handler)
700 cls.logger.debug("--- setUpClass() for %s called ---" % cls.__name__)
701 os.chdir(cls.tempdir)
703 "Temporary dir is %s, api socket is %s",
705 cls.get_api_sock_path(),
707 cls.logger.debug("Random seed is %s", config.rnd_seed)
709 cls.reset_packet_infos()
714 cls.registry = VppObjectRegistry()
715 cls.vpp_startup_failed = False
716 cls.reporter = KeepAliveReporter()
717 # need to catch exceptions here because if we raise, then the cleanup
718 # doesn't get called and we might end with a zombie vpp
724 if not hasattr(cls, "vpp"):
726 cls.reporter.send_keep_alive(cls, "setUpClass")
727 VppTestResult.current_test_case_info = TestCaseInfo(
728 cls.logger, cls.tempdir, cls.vpp.pid, config.vpp
730 cls.vpp_stdout_deque = deque()
731 cls.vpp_stderr_deque = deque()
732 # Pump thread in a non-debug-attached & not running-vpp
733 if not cls.debug_attach and not hasattr(cls, "running_vpp"):
734 cls.pump_thread_stop_flag = Event()
735 cls.pump_thread_wakeup_pipe = os.pipe()
736 cls.pump_thread = Thread(target=pump_output, args=(cls,))
737 cls.pump_thread.daemon = True
738 cls.pump_thread.start()
739 if cls.debug_gdb or cls.debug_gdbserver or cls.debug_attach:
740 cls.vapi_response_timeout = 0
741 cls.vapi = VppPapiProvider(cls.__name__, cls, cls.vapi_response_timeout)
743 hook = hookmodule.StepHook(cls)
745 hook = hookmodule.PollHook(cls)
746 cls.vapi.register_hook(hook)
747 cls.statistics = VPPStats(socketname=cls.get_stats_sock_path())
751 cls.vpp_startup_failed = True
753 "VPP died shortly after startup, check the"
754 " output to standard error for possible cause"
759 except (vpp_papi.VPPIOError, Exception) as e:
760 cls.logger.debug("Exception connecting to vapi: %s" % e)
761 cls.vapi.disconnect()
763 if cls.debug_gdbserver:
766 "You're running VPP inside gdbserver but "
767 "VPP-API connection failed, did you forget "
768 "to 'continue' VPP from within gdb?",
774 last_line = cls.vapi.cli("show thread").split("\n")[-2]
775 cls.vpp_worker_count = int(last_line.split(" ")[0])
776 print("Detected VPP with %s workers." % cls.vpp_worker_count)
777 except vpp_papi.VPPRuntimeError as e:
778 cls.logger.debug("%s" % e)
781 except Exception as e:
782 cls.logger.debug("Exception connecting to VPP: %s" % e)
787 def _debug_quit(cls):
788 if cls.debug_gdbserver or cls.debug_gdb:
792 if cls.vpp.returncode is None:
794 print(double_line_delim)
795 print("VPP or GDB server is still running")
796 print(single_line_delim)
798 "When done debugging, press ENTER to kill the "
799 "process and finish running the testcase..."
801 except AttributeError:
807 Disconnect vpp-api, kill vpp and cleanup shared memory files
810 if hasattr(cls, "running_vpp"):
813 # first signal that we want to stop the pump thread, then wake it up
814 if hasattr(cls, "pump_thread_stop_flag"):
815 cls.pump_thread_stop_flag.set()
816 if hasattr(cls, "pump_thread_wakeup_pipe"):
817 os.write(cls.pump_thread_wakeup_pipe[1], b"ding dong wake up")
818 if hasattr(cls, "pump_thread"):
819 cls.logger.debug("Waiting for pump thread to stop")
820 cls.pump_thread.join()
821 if hasattr(cls, "vpp_stderr_reader_thread"):
822 cls.logger.debug("Waiting for stderr pump to stop")
823 cls.vpp_stderr_reader_thread.join()
825 if hasattr(cls, "vpp"):
826 if hasattr(cls, "vapi"):
827 cls.logger.debug(cls.vapi.vpp.get_stats())
828 cls.logger.debug("Disconnecting class vapi client on %s", cls.__name__)
829 cls.vapi.disconnect()
830 cls.logger.debug("Deleting class vapi attribute on %s", cls.__name__)
833 if not cls.debug_attach and cls.vpp.returncode is None:
834 cls.wait_for_coredump()
835 cls.logger.debug("Sending TERM to vpp")
837 cls.logger.debug("Waiting for vpp to die")
839 outs, errs = cls.vpp.communicate(timeout=5)
840 except subprocess.TimeoutExpired:
842 outs, errs = cls.vpp.communicate()
843 cls.logger.debug("Deleting class vpp attribute on %s", cls.__name__)
844 if not cls.debug_attach and not hasattr(cls, "running_vpp"):
845 cls.vpp.stdout.close()
846 cls.vpp.stderr.close()
847 # If vpp is a dynamic attribute set by the func use_running,
848 # deletion will result in an AttributeError that we can
852 except AttributeError:
855 if cls.vpp_startup_failed:
856 stdout_log = cls.logger.info
857 stderr_log = cls.logger.critical
859 stdout_log = cls.logger.info
860 stderr_log = cls.logger.info
862 if hasattr(cls, "vpp_stdout_deque"):
863 stdout_log(single_line_delim)
864 stdout_log("VPP output to stdout while running %s:", cls.__name__)
865 stdout_log(single_line_delim)
866 vpp_output = "".join(cls.vpp_stdout_deque)
867 with open(cls.tempdir + "/vpp_stdout.txt", "w") as f:
869 stdout_log("\n%s", vpp_output)
870 stdout_log(single_line_delim)
872 if hasattr(cls, "vpp_stderr_deque"):
873 stderr_log(single_line_delim)
874 stderr_log("VPP output to stderr while running %s:", cls.__name__)
875 stderr_log(single_line_delim)
876 vpp_output = "".join(cls.vpp_stderr_deque)
877 with open(cls.tempdir + "/vpp_stderr.txt", "w") as f:
879 stderr_log("\n%s", vpp_output)
880 stderr_log(single_line_delim)
883 def tearDownClass(cls):
884 """Perform final cleanup after running all tests in this test-case"""
885 cls.logger.debug("--- tearDownClass() for %s called ---" % cls.__name__)
886 if not hasattr(cls, "vpp"):
888 cls.reporter.send_keep_alive(cls, "tearDownClass")
890 cls.file_handler.close()
891 cls.reset_packet_infos()
892 if config.debug_framework:
893 debug_internal.on_tear_down_class(cls)
895 def show_commands_at_teardown(self):
896 """Allow subclass specific teardown logging additions."""
897 self.logger.info("--- No test specific show commands provided. ---")
900 """Show various debug prints after each test"""
902 "--- tearDown() for %s.%s(%s) called ---"
903 % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
905 if not hasattr(self, "vpp"):
909 if not self.vpp_dead:
910 self.logger.debug(self.vapi.cli("show trace max 1000"))
911 self.logger.info(self.vapi.ppcli("show interface"))
912 self.logger.info(self.vapi.ppcli("show hardware"))
913 self.logger.info(self.statistics.set_errors_str())
914 self.logger.info(self.vapi.ppcli("show run"))
915 self.logger.info(self.vapi.ppcli("show log"))
916 self.logger.info(self.vapi.ppcli("show bihash"))
917 self.logger.info("Logging testcase specific show commands.")
918 self.show_commands_at_teardown()
919 if self.remove_configured_vpp_objects_on_tear_down:
920 self.registry.remove_vpp_config(self.logger)
921 # Save/Dump VPP api trace log
922 m = self._testMethodName
923 api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
924 tmp_api_trace = "/tmp/%s" % api_trace
925 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
926 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
927 self.logger.info("Moving %s to %s\n" % (tmp_api_trace, vpp_api_trace_log))
928 shutil.move(tmp_api_trace, vpp_api_trace_log)
929 except VppTransportSocketIOError:
931 "VppTransportSocketIOError: Vpp dead. Cannot log show commands."
935 self.registry.unregister_all(self.logger)
938 """Clear trace before running each test"""
939 super(VppTestCase, self).setUp()
940 if not hasattr(self, "vpp"):
942 self.reporter.send_keep_alive(self)
946 testcase=self.__class__.__name__,
947 method_name=self._testMethodName,
949 self.sleep(0.1, "during setUp")
950 self.vpp_stdout_deque.append(
951 "--- test setUp() for %s.%s(%s) starts here ---\n"
952 % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
954 self.vpp_stderr_deque.append(
955 "--- test setUp() for %s.%s(%s) starts here ---\n"
956 % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
958 self.vapi.cli("clear trace")
959 # store the test instance inside the test class - so that objects
960 # holding the class can access instance methods (like assertEqual)
961 type(self).test_instance = self
964 def pg_enable_capture(cls, interfaces=None):
966 Enable capture on packet-generator interfaces
968 :param interfaces: iterable interface indexes (if None,
969 use self.pg_interfaces)
972 if interfaces is None:
973 interfaces = cls.pg_interfaces
978 def register_pcap(cls, intf, worker):
979 """Register a pcap in the testclass"""
980 # add to the list of captures with current timestamp
981 cls._pcaps.append((intf, worker))
984 def get_vpp_time(cls):
985 # processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT"
986 # returns float("2.190522")
987 timestr = cls.vapi.cli("show clock")
988 head, sep, tail = timestr.partition(",")
989 head, sep, tail = head.partition("Time now")
993 def sleep_on_vpp_time(cls, sec):
994 """Sleep according to time in VPP world"""
995 # On a busy system with many processes
996 # we might end up with VPP time being slower than real world
997 # So take that into account when waiting for VPP to do something
998 start_time = cls.get_vpp_time()
999 while cls.get_vpp_time() - start_time < sec:
1003 def pg_start(cls, trace=True):
1004 """Enable the PG, wait till it is done, then clean up"""
1005 for (intf, worker) in cls._old_pcaps:
1006 intf.handle_old_pcap_file(intf.get_in_path(worker), intf.in_history_counter)
1009 cls.vapi.cli("clear trace")
1010 cls.vapi.cli("trace add pg-input 1000")
1011 cls.vapi.cli("packet-generator enable")
1012 # PG, when starts, runs to completion -
1013 # so let's avoid a race condition,
1014 # and wait a little till it's done.
1015 # Then clean it up - and then be gone.
1016 deadline = time.time() + 300
1017 while cls.vapi.cli("show packet-generator").find("Yes") != -1:
1018 cls.sleep(0.01) # yield
1019 if time.time() > deadline:
1020 cls.logger.error("Timeout waiting for pg to stop")
1022 for intf, worker in cls._pcaps:
1023 cls.vapi.cli("packet-generator delete %s" % intf.get_cap_name(worker))
1024 cls._old_pcaps = cls._pcaps
1028 def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0, mode=None):
1030 Create packet-generator interfaces.
1032 :param interfaces: iterable indexes of the interfaces.
1033 :returns: List of created interfaces.
1037 for i in interfaces:
1038 intf = VppPGInterface(cls, i, gso, gso_size, mode)
1039 setattr(cls, intf.name, intf)
1041 cls.pg_interfaces = result
1045 def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
1046 if not hasattr(cls, "vpp"):
1047 cls.pg_interfaces = []
1048 return cls.pg_interfaces
1049 pgmode = VppEnum.vl_api_pg_interface_mode_t
1050 return cls.create_pg_interfaces_internal(
1051 interfaces, gso, gso_size, pgmode.PG_API_MODE_IP4
1055 def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
1056 if not hasattr(cls, "vpp"):
1057 cls.pg_interfaces = []
1058 return cls.pg_interfaces
1059 pgmode = VppEnum.vl_api_pg_interface_mode_t
1060 return cls.create_pg_interfaces_internal(
1061 interfaces, gso, gso_size, pgmode.PG_API_MODE_IP6
1065 def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
1066 if not hasattr(cls, "vpp"):
1067 cls.pg_interfaces = []
1068 return cls.pg_interfaces
1069 pgmode = VppEnum.vl_api_pg_interface_mode_t
1070 return cls.create_pg_interfaces_internal(
1071 interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
1075 def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
1076 if not hasattr(cls, "vpp"):
1077 cls.pg_interfaces = []
1078 return cls.pg_interfaces
1079 pgmode = VppEnum.vl_api_pg_interface_mode_t
1080 return cls.create_pg_interfaces_internal(
1081 interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
1085 def create_loopback_interfaces(cls, count):
1087 Create loopback interfaces.
1089 :param count: number of interfaces created.
1090 :returns: List of created interfaces.
1092 if not hasattr(cls, "vpp"):
1093 cls.lo_interfaces = []
1094 return cls.lo_interfaces
1095 result = [VppLoInterface(cls) for i in range(count)]
1097 setattr(cls, intf.name, intf)
1098 cls.lo_interfaces = result
1102 def create_bvi_interfaces(cls, count):
1104 Create BVI interfaces.
1106 :param count: number of interfaces created.
1107 :returns: List of created interfaces.
1109 if not hasattr(cls, "vpp"):
1110 cls.bvi_interfaces = []
1111 return cls.bvi_interfaces
1112 result = [VppBviInterface(cls) for i in range(count)]
1114 setattr(cls, intf.name, intf)
1115 cls.bvi_interfaces = result
1119 def extend_packet(packet, size, padding=" "):
1121 Extend packet to given size by padding with spaces or custom padding
1122 NOTE: Currently works only when Raw layer is present.
1124 :param packet: packet
1125 :param size: target size
1126 :param padding: padding used to extend the payload
1129 packet_len = len(packet) + 4
1130 extend = size - packet_len
1132 num = (extend // len(padding)) + 1
1133 packet[Raw].load += (padding * num)[:extend].encode("ascii")
1136 def reset_packet_infos(cls):
1137 """Reset the list of packet info objects and packet counts to zero"""
1138 cls._packet_infos = {}
1139 cls._packet_count_for_dst_if_idx = {}
1142 def create_packet_info(cls, src_if, dst_if):
1144 Create packet info object containing the source and destination indexes
1145 and add it to the testcase's packet info list
1147 :param VppInterface src_if: source interface
1148 :param VppInterface dst_if: destination interface
1150 :returns: _PacketInfo object
1153 info = _PacketInfo()
1154 info.index = len(cls._packet_infos)
1155 info.src = src_if.sw_if_index
1156 info.dst = dst_if.sw_if_index
1157 if isinstance(dst_if, VppSubInterface):
1158 dst_idx = dst_if.parent.sw_if_index
1160 dst_idx = dst_if.sw_if_index
1161 if dst_idx in cls._packet_count_for_dst_if_idx:
1162 cls._packet_count_for_dst_if_idx[dst_idx] += 1
1164 cls._packet_count_for_dst_if_idx[dst_idx] = 1
1165 cls._packet_infos[info.index] = info
1169 def info_to_payload(info):
1171 Convert _PacketInfo object to packet payload
1173 :param info: _PacketInfo object
1175 :returns: string containing serialized data from packet info
1178 # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1179 return pack("iiiih", info.index, info.src, info.dst, info.ip, info.proto)
1182 def payload_to_info(payload, payload_field="load"):
1184 Convert packet payload to _PacketInfo object
1186 :param payload: packet payload
1187 :type payload: <class 'scapy.packet.Raw'>
1188 :param payload_field: packet fieldname of payload "load" for
1189 <class 'scapy.packet.Raw'>
1190 :type payload_field: str
1191 :returns: _PacketInfo object containing de-serialized data from payload
1195 # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1196 payload_b = getattr(payload, payload_field)[:18]
1198 info = _PacketInfo()
1199 info.index, info.src, info.dst, info.ip, info.proto = unpack("iiiih", payload_b)
1201 # some SRv6 TCs depend on get an exception if bad values are detected
1202 if info.index > 0x4000:
1203 raise ValueError("Index value is invalid")
1207 def get_next_packet_info(self, info):
1209 Iterate over the packet info list stored in the testcase
1210 Start iteration with first element if info is None
1211 Continue based on index in info if info is specified
1213 :param info: info or None
1214 :returns: next info in list or None if no more infos
1219 next_index = info.index + 1
1220 if next_index == len(self._packet_infos):
1223 return self._packet_infos[next_index]
1225 def get_next_packet_info_for_interface(self, src_index, info):
1227 Search the packet info list for the next packet info with same source
1230 :param src_index: source interface index to search for
1231 :param info: packet info - where to start the search
1232 :returns: packet info or None
1236 info = self.get_next_packet_info(info)
1239 if info.src == src_index:
1242 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
1244 Search the packet info list for the next packet info with same source
1245 and destination interface indexes
1247 :param src_index: source interface index to search for
1248 :param dst_index: destination interface index to search for
1249 :param info: packet info - where to start the search
1250 :returns: packet info or None
1254 info = self.get_next_packet_info_for_interface(src_index, info)
1257 if info.dst == dst_index:
1260 def assert_equal(self, real_value, expected_value, name_or_class=None):
1261 if name_or_class is None:
1262 self.assertEqual(real_value, expected_value)
1265 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
1267 getdoc(name_or_class).strip(),
1269 str(name_or_class(real_value)),
1271 str(name_or_class(expected_value)),
1274 msg = "Invalid %s: %s does not match expected value %s" % (
1280 self.assertEqual(real_value, expected_value, msg)
1282 def assert_in_range(self, real_value, expected_min, expected_max, name=None):
1286 msg = "Invalid %s: %s out of range <%s,%s>" % (
1292 self.assertTrue(expected_min <= real_value <= expected_max, msg)
1294 def assert_packet_checksums_valid(self, packet, ignore_zero_udp_checksums=True):
1295 received = packet.__class__(scapy.compat.raw(packet))
1296 udp_layers = ["UDP", "UDPerror"]
1297 checksum_fields = ["cksum", "chksum"]
1300 temp = received.__class__(scapy.compat.raw(received))
1302 layer = temp.getlayer(counter)
1304 layer = layer.copy()
1305 layer.remove_payload()
1306 for cf in checksum_fields:
1307 if hasattr(layer, cf):
1309 ignore_zero_udp_checksums
1310 and 0 == getattr(layer, cf)
1311 and layer.name in udp_layers
1314 delattr(temp.getlayer(counter), cf)
1315 checksums.append((counter, cf))
1318 counter = counter + 1
1319 if 0 == len(checksums):
1321 temp = temp.__class__(scapy.compat.raw(temp))
1322 for layer, cf in reversed(checksums):
1323 calc_sum = getattr(temp[layer], cf)
1325 getattr(received[layer], cf),
1327 "packet checksum on layer #%d: %s" % (layer, temp[layer].name),
1330 "Checksum field `%s` on `%s` layer has correct value `%s`"
1331 % (cf, temp[layer].name, calc_sum)
1334 def assert_checksum_valid(
1338 checksum_field_names=["chksum", "cksum"],
1339 ignore_zero_checksum=False,
1341 """Check checksum of received packet on given layer"""
1342 layer_copy = received_packet[layer].copy()
1343 layer_copy.remove_payload()
1345 for f in checksum_field_names:
1346 if hasattr(layer_copy, f):
1349 if field_name is None:
1351 f"Layer `{layer}` has none of checksum fields: `{checksum_field_names}`."
1353 received_packet_checksum = getattr(received_packet[layer], field_name)
1354 if ignore_zero_checksum and 0 == received_packet_checksum:
1356 recalculated = received_packet.__class__(scapy.compat.raw(received_packet))
1357 delattr(recalculated[layer], field_name)
1358 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1360 received_packet_checksum,
1361 getattr(recalculated[layer], field_name),
1362 f"packet checksum (field: {field_name}) on layer: %s" % layer,
1365 def assert_ip_checksum_valid(self, received_packet, ignore_zero_checksum=False):
1366 self.assert_checksum_valid(
1367 received_packet, "IP", ignore_zero_checksum=ignore_zero_checksum
1370 def assert_tcp_checksum_valid(self, received_packet, ignore_zero_checksum=False):
1371 self.assert_checksum_valid(
1372 received_packet, "TCP", ignore_zero_checksum=ignore_zero_checksum
1375 def assert_udp_checksum_valid(self, received_packet, ignore_zero_checksum=True):
1376 self.assert_checksum_valid(
1377 received_packet, "UDP", ignore_zero_checksum=ignore_zero_checksum
1380 def assert_embedded_icmp_checksum_valid(self, received_packet):
1381 if received_packet.haslayer(IPerror):
1382 self.assert_checksum_valid(received_packet, "IPerror")
1383 if received_packet.haslayer(TCPerror):
1384 self.assert_checksum_valid(received_packet, "TCPerror")
1385 if received_packet.haslayer(UDPerror):
1386 self.assert_checksum_valid(
1387 received_packet, "UDPerror", ignore_zero_checksum=True
1389 if received_packet.haslayer(ICMPerror):
1390 self.assert_checksum_valid(received_packet, "ICMPerror")
1392 def assert_icmp_checksum_valid(self, received_packet):
1393 self.assert_checksum_valid(received_packet, "ICMP")
1394 self.assert_embedded_icmp_checksum_valid(received_packet)
1396 def assert_icmpv6_checksum_valid(self, pkt):
1397 if pkt.haslayer(ICMPv6DestUnreach):
1398 self.assert_checksum_valid(pkt, "ICMPv6DestUnreach")
1399 self.assert_embedded_icmp_checksum_valid(pkt)
1400 if pkt.haslayer(ICMPv6EchoRequest):
1401 self.assert_checksum_valid(pkt, "ICMPv6EchoRequest")
1402 if pkt.haslayer(ICMPv6EchoReply):
1403 self.assert_checksum_valid(pkt, "ICMPv6EchoReply")
1405 def get_counter(self, counter):
1406 if counter.startswith("/"):
1407 counter_value = self.statistics.get_counter(counter)
1409 counters = self.vapi.cli("sh errors").split("\n")
1411 for i in range(1, len(counters) - 1):
1412 results = counters[i].split()
1413 if results[1] == counter:
1414 counter_value = int(results[0])
1416 return counter_value
1418 def assert_counter_equal(self, counter, expected_value, thread=None, index=0):
1419 c = self.get_counter(counter)
1420 if thread is not None:
1421 c = c[thread][index]
1423 c = sum(x[index] for x in c)
1424 self.assert_equal(c, expected_value, "counter `%s'" % counter)
1426 def assert_packet_counter_equal(self, counter, expected_value):
1427 counter_value = self.get_counter(counter)
1429 counter_value, expected_value, "packet counter `%s'" % counter
1432 def assert_error_counter_equal(self, counter, expected_value):
1433 counter_value = self.statistics[counter].sum()
1434 self.assert_equal(counter_value, expected_value, "error counter `%s'" % counter)
1437 def sleep(cls, timeout, remark=None):
1439 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1440 # * by Guido, only the main thread can be interrupted.
1442 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1445 if hasattr(os, "sched_yield"):
1451 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1452 before = time.time()
1455 if after - before > 2 * timeout:
1457 "unexpected self.sleep() result - slept for %es instead of ~%es!",
1463 "Finished sleep (%s) - slept %es (wanted %es)",
1469 def virtual_sleep(self, timeout, remark=None):
1470 self.logger.debug("Moving VPP time by %s (%s)", timeout, remark)
1471 self.vapi.cli("set clock adjust %s" % timeout)
1473 def pg_send(self, intf, pkts, worker=None, trace=True):
1474 intf.add_stream(pkts, worker=worker)
1475 self.pg_enable_capture(self.pg_interfaces)
1476 self.pg_start(trace=trace)
1478 def snapshot_stats(self, stats_diff):
1479 """Return snapshot of interesting stats based on diff dictionary."""
1481 for sw_if_index in stats_diff:
1482 for counter in stats_diff[sw_if_index]:
1483 stats_snapshot[counter] = self.statistics[counter]
1484 self.logger.debug(f"Took statistics stats_snapshot: {stats_snapshot}")
1485 return stats_snapshot
1487 def compare_stats_with_snapshot(self, stats_diff, stats_snapshot):
1488 """Assert appropriate difference between current stats and snapshot."""
1489 for sw_if_index in stats_diff:
1490 for cntr, diff in stats_diff[sw_if_index].items():
1491 if sw_if_index == "err":
1493 self.statistics[cntr].sum(),
1494 stats_snapshot[cntr].sum() + diff,
1495 f"'{cntr}' counter value (previous value: "
1496 f"{stats_snapshot[cntr].sum()}, "
1497 f"expected diff: {diff})",
1502 self.statistics[cntr][:, sw_if_index].sum(),
1503 stats_snapshot[cntr][:, sw_if_index].sum() + diff,
1504 f"'{cntr}' counter value (previous value: "
1505 f"{stats_snapshot[cntr][:, sw_if_index].sum()}, "
1506 f"expected diff: {diff})",
1508 except IndexError as e:
1509 # if diff is 0, then this most probably a case where
1510 # test declares multiple interfaces but traffic hasn't
1511 # passed through this one yet - which means the counter
1512 # value is 0 and can be ignored
1515 f"Couldn't sum counter: {cntr} on sw_if_index: {sw_if_index}"
1518 def send_and_assert_no_replies(
1519 self, intf, pkts, remark="", timeout=None, stats_diff=None, trace=True, msg=None
1522 stats_snapshot = self.snapshot_stats(stats_diff)
1524 self.pg_send(intf, pkts)
1529 for i in self.pg_interfaces:
1530 i.assert_nothing_captured(timeout=timeout, remark=remark)
1535 self.logger.debug(f"send_and_assert_no_replies: {msg}")
1536 self.logger.debug(self.vapi.cli("show trace"))
1539 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1541 def send_and_expect(
1553 stats_snapshot = self.snapshot_stats(stats_diff)
1556 n_rx = 1 if isinstance(pkts, Packet) else len(pkts)
1557 self.pg_send(intf, pkts, worker=worker, trace=trace)
1558 rx = output.get_capture(n_rx)
1561 self.logger.debug(f"send_and_expect: {msg}")
1562 self.logger.debug(self.vapi.cli("show trace"))
1565 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1569 def send_and_expect_load_balancing(
1570 self, input, pkts, outputs, worker=None, trace=True
1572 self.pg_send(input, pkts, worker=worker, trace=trace)
1575 rx = oo._get_capture(1)
1576 self.assertNotEqual(0, len(rx))
1579 self.logger.debug(self.vapi.cli("show trace"))
1582 def send_and_expect_some(self, intf, pkts, output, worker=None, trace=True):
1583 self.pg_send(intf, pkts, worker=worker, trace=trace)
1584 rx = output._get_capture(1)
1586 self.logger.debug(self.vapi.cli("show trace"))
1587 self.assertTrue(len(rx) > 0)
1588 self.assertTrue(len(rx) < len(pkts))
1591 def send_and_expect_only(self, intf, pkts, output, timeout=None, stats_diff=None):
1593 stats_snapshot = self.snapshot_stats(stats_diff)
1595 self.pg_send(intf, pkts)
1596 rx = output.get_capture(len(pkts))
1600 for i in self.pg_interfaces:
1601 if i not in outputs:
1602 i.assert_nothing_captured(timeout=timeout)
1606 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1611 def get_testcase_doc_name(test):
1612 return getdoc(test.__class__).splitlines()[0]
1615 def get_test_description(descriptions, test):
1616 short_description = test.shortDescription()
1617 if descriptions and short_description:
1618 return short_description
1623 class TestCaseInfo(object):
1624 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1625 self.logger = logger
1626 self.tempdir = tempdir
1627 self.vpp_pid = vpp_pid
1628 self.vpp_bin_path = vpp_bin_path
1629 self.core_crash_test = None
1632 class VppTestResult(unittest.TestResult):
1634 @property result_string
1635 String variable to store the test case result string.
1637 List variable containing 2-tuples of TestCase instances and strings
1638 holding formatted tracebacks. Each tuple represents a test which
1639 raised an unexpected exception.
1641 List variable containing 2-tuples of TestCase instances and strings
1642 holding formatted tracebacks. Each tuple represents a test where
1643 a failure was explicitly signalled using the TestCase.assert*()
1647 failed_test_cases_info = set()
1648 core_crash_test_cases_info = set()
1649 current_test_case_info = None
1651 def __init__(self, stream=None, descriptions=None, verbosity=None, runner=None):
1653 :param stream File descriptor to store where to report test results.
1654 Set to the standard error stream by default.
1655 :param descriptions Boolean variable to store information if to use
1656 test case descriptions.
1657 :param verbosity Integer variable to store required verbosity level.
1659 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1660 self.stream = stream
1661 self.descriptions = descriptions
1662 self.verbosity = verbosity
1663 self.result_code = TestResultCode.TEST_RUN
1664 self.result_string = None
1665 self.runner = runner
1668 def addSuccess(self, test):
1670 Record a test succeeded result
1675 self.log_result("addSuccess", test)
1676 unittest.TestResult.addSuccess(self, test)
1677 self.result_string = colorize("OK", GREEN)
1678 self.result_code = TestResultCode.PASS
1679 self.send_result_through_pipe(test, self.result_code)
1681 def addExpectedFailure(self, test, err):
1682 self.log_result("addExpectedFailure", test, err)
1683 super().addExpectedFailure(test, err)
1684 self.result_string = colorize("FAIL", GREEN)
1685 self.result_code = TestResultCode.EXPECTED_FAIL
1686 self.send_result_through_pipe(test, self.result_code)
1688 def addUnexpectedSuccess(self, test):
1689 self.log_result("addUnexpectedSuccess", test)
1690 super().addUnexpectedSuccess(test)
1691 self.result_string = colorize("OK", RED)
1692 self.result_code = TestResultCode.UNEXPECTED_PASS
1693 self.send_result_through_pipe(test, self.result_code)
1695 def addSkip(self, test, reason):
1697 Record a test skipped.
1703 self.log_result("addSkip", test, reason=reason)
1704 unittest.TestResult.addSkip(self, test, reason)
1705 self.result_string = colorize("SKIP", YELLOW)
1707 if reason == "not enough cpus":
1708 self.result_code = TestResultCode.SKIP_CPU_SHORTAGE
1710 self.result_code = TestResultCode.SKIP
1711 self.send_result_through_pipe(test, self.result_code)
1713 def symlink_failed(self):
1714 if self.current_test_case_info:
1716 failed_dir = config.failed_dir
1717 link_path = os.path.join(
1719 "%s-FAILED" % os.path.basename(self.current_test_case_info.tempdir),
1722 self.current_test_case_info.logger.debug(
1723 "creating a link to the failed test"
1725 self.current_test_case_info.logger.debug(
1726 "os.symlink(%s, %s)"
1727 % (self.current_test_case_info.tempdir, link_path)
1729 if os.path.exists(link_path):
1730 self.current_test_case_info.logger.debug("symlink already exists")
1732 os.symlink(self.current_test_case_info.tempdir, link_path)
1734 except Exception as e:
1735 self.current_test_case_info.logger.error(e)
1737 def send_result_through_pipe(self, test, result):
1738 if hasattr(self, "test_framework_result_pipe"):
1739 pipe = self.test_framework_result_pipe
1741 pipe.send((test.id(), result))
1743 def log_result(self, fn, test, err=None, reason=None):
1744 if self.current_test_case_info:
1745 if isinstance(test, unittest.suite._ErrorHolder):
1746 test_name = test.description
1748 test_name = "%s.%s(%s)" % (
1749 test.__class__.__name__,
1750 test._testMethodName,
1751 test._testMethodDoc,
1755 extra_msg += f", error is {err}"
1757 extra_msg += f", reason is {reason}"
1758 self.current_test_case_info.logger.debug(
1759 f"--- {fn}() {test_name} called{extra_msg}"
1762 self.current_test_case_info.logger.debug(
1763 "formatted exception is:\n%s" % "".join(format_exception(*err))
1766 def add_error(self, test, err, unittest_fn, result_code):
1767 self.result_code = result_code
1768 if result_code == TestResultCode.FAIL:
1769 self.log_result("addFailure", test, err=err)
1770 error_type_str = colorize("FAIL", RED)
1771 elif result_code == TestResultCode.ERROR:
1772 self.log_result("addError", test, err=err)
1773 error_type_str = colorize("ERROR", RED)
1775 raise Exception(f"Unexpected result code {result_code}")
1777 unittest_fn(self, test, err)
1778 if self.current_test_case_info:
1779 self.result_string = "%s [ temp dir used by test case: %s ]" % (
1781 self.current_test_case_info.tempdir,
1783 self.symlink_failed()
1784 self.failed_test_cases_info.add(self.current_test_case_info)
1785 if is_core_present(self.current_test_case_info.tempdir):
1786 if not self.current_test_case_info.core_crash_test:
1787 if isinstance(test, unittest.suite._ErrorHolder):
1788 test_name = str(test)
1790 test_name = "'{!s}' ({!s})".format(
1791 get_testcase_doc_name(test), test.id()
1793 self.current_test_case_info.core_crash_test = test_name
1794 self.core_crash_test_cases_info.add(self.current_test_case_info)
1796 self.result_string = "%s [no temp dir]" % error_type_str
1798 self.send_result_through_pipe(test, result_code)
1800 def addFailure(self, test, err):
1802 Record a test failed result
1805 :param err: error message
1808 self.add_error(test, err, unittest.TestResult.addFailure, TestResultCode.FAIL)
1810 def addError(self, test, err):
1812 Record a test error result
1815 :param err: error message
1818 self.add_error(test, err, unittest.TestResult.addError, TestResultCode.ERROR)
1820 def getDescription(self, test):
1822 Get test description
1825 :returns: test description
1828 return get_test_description(self.descriptions, test)
1830 def startTest(self, test):
1838 def print_header(test):
1839 if test.__class__ in self.printed:
1842 test_doc = getdoc(test)
1844 raise Exception("No doc string for test '%s'" % test.id())
1846 test_title = test_doc.splitlines()[0].rstrip()
1847 test_title = colorize(test_title, GREEN)
1848 if test.is_tagged_run_solo():
1849 test_title = colorize(f"SOLO RUN: {test_title}", YELLOW)
1851 # This block may overwrite the colorized title above,
1852 # but we want this to stand out and be fixed
1853 if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
1854 test_title = colorize(f"FIXME with VPP workers: {test_title}", RED)
1856 if test.has_tag(TestCaseTag.FIXME_ASAN):
1857 test_title = colorize(f"FIXME with ASAN: {test_title}", RED)
1858 test.skip_fixme_asan()
1860 if is_distro_ubuntu2204 == True and test.has_tag(
1861 TestCaseTag.FIXME_UBUNTU2204
1863 test_title = colorize(f"FIXME on Ubuntu-22.04: {test_title}", RED)
1864 test.skip_fixme_ubuntu2204()
1866 if is_distro_debian11 == True and test.has_tag(TestCaseTag.FIXME_DEBIAN11):
1867 test_title = colorize(f"FIXME on Debian-11: {test_title}", RED)
1868 test.skip_fixme_debian11()
1870 if "debug" in config.vpp_tag and test.has_tag(TestCaseTag.FIXME_VPP_DEBUG):
1871 test_title = colorize(f"FIXME on VPP Debug: {test_title}", RED)
1872 test.skip_fixme_vpp_debug()
1874 if hasattr(test, "vpp_worker_count"):
1875 if test.vpp_worker_count == 0:
1876 test_title += " [main thread only]"
1877 elif test.vpp_worker_count == 1:
1878 test_title += " [1 worker thread]"
1880 test_title += f" [{test.vpp_worker_count} worker threads]"
1882 if test.__class__.skipped_due_to_cpu_lack:
1883 test_title = colorize(
1884 f"{test_title} [skipped - not enough cpus, "
1885 f"required={test.__class__.get_cpus_required()}, "
1886 f"available={max_vpp_cpus}]",
1890 print(double_line_delim)
1892 print(double_line_delim)
1893 self.printed.append(test.__class__)
1896 self.start_test = time.time()
1897 unittest.TestResult.startTest(self, test)
1898 if self.verbosity > 0:
1899 self.stream.writeln("Starting " + self.getDescription(test) + " ...")
1900 self.stream.writeln(single_line_delim)
1902 def stopTest(self, test):
1904 Called when the given test has been run
1909 unittest.TestResult.stopTest(self, test)
1911 result_code_to_suffix = {
1912 TestResultCode.PASS: "",
1913 TestResultCode.FAIL: "",
1914 TestResultCode.ERROR: "",
1915 TestResultCode.SKIP: "",
1916 TestResultCode.TEST_RUN: "",
1917 TestResultCode.SKIP_CPU_SHORTAGE: "",
1918 TestResultCode.EXPECTED_FAIL: " [EXPECTED FAIL]",
1919 TestResultCode.UNEXPECTED_PASS: " [UNEXPECTED PASS]",
1922 if self.verbosity > 0:
1923 self.stream.writeln(single_line_delim)
1924 self.stream.writeln(
1927 self.getDescription(test),
1929 result_code_to_suffix[self.result_code],
1932 self.stream.writeln(single_line_delim)
1934 self.stream.writeln(
1937 self.getDescription(test),
1938 time.time() - self.start_test,
1940 result_code_to_suffix[self.result_code],
1944 self.send_result_through_pipe(test, TestResultCode.TEST_RUN)
1946 def printErrors(self):
1948 Print errors from running the test case
1950 if len(self.errors) > 0 or len(self.failures) > 0:
1951 self.stream.writeln()
1952 self.printErrorList("ERROR", self.errors)
1953 self.printErrorList("FAIL", self.failures)
1955 # ^^ that is the last output from unittest before summary
1956 if not self.runner.print_summary:
1957 devnull = unittest.runner._WritelnDecorator(open(os.devnull, "w"))
1958 self.stream = devnull
1959 self.runner.stream = devnull
1961 def printErrorList(self, flavour, errors):
1963 Print error list to the output stream together with error type
1964 and test case description.
1966 :param flavour: error type
1967 :param errors: iterable errors
1970 for test, err in errors:
1971 self.stream.writeln(double_line_delim)
1972 self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
1973 self.stream.writeln(single_line_delim)
1974 self.stream.writeln("%s" % err)
1977 class VppTestRunner(unittest.TextTestRunner):
1979 A basic test runner implementation which prints results to standard error.
1983 def resultclass(self):
1984 """Class maintaining the results of the tests"""
1985 return VppTestResult
1989 keep_alive_pipe=None,
1999 # ignore stream setting here, use hard-coded stdout to be in sync
2000 # with prints from VppTestCase methods ...
2001 super(VppTestRunner, self).__init__(
2002 sys.stdout, descriptions, verbosity, failfast, buffer, resultclass, **kwargs
2004 KeepAliveReporter.pipe = keep_alive_pipe
2006 self.orig_stream = self.stream
2007 self.resultclass.test_framework_result_pipe = result_pipe
2009 self.print_summary = print_summary
2011 def _makeResult(self):
2012 return self.resultclass(self.stream, self.descriptions, self.verbosity, self)
2014 def run(self, test):
2021 faulthandler.enable() # emit stack trace to stderr if killed by signal
2023 result = super(VppTestRunner, self).run(test)
2024 if not self.print_summary:
2025 self.stream = self.orig_stream
2026 result.stream = self.orig_stream
2030 class Worker(Thread):
2031 def __init__(self, executable_args, logger, env=None, *args, **kwargs):
2032 super(Worker, self).__init__(*args, **kwargs)
2033 self.logger = logger
2034 self.args = executable_args
2035 if hasattr(self, "testcase") and self.testcase.debug_all:
2036 if self.testcase.debug_gdbserver:
2038 "/usr/bin/gdbserver",
2039 "localhost:{port}".format(port=self.testcase.gdbserver_port),
2041 elif self.testcase.debug_gdb and hasattr(self, "wait_for_gdb"):
2042 self.args.append(self.wait_for_gdb)
2043 self.app_bin = executable_args[0]
2044 self.app_name = os.path.basename(self.app_bin)
2045 if hasattr(self, "role"):
2046 self.app_name += " {role}".format(role=self.role)
2049 env = {} if env is None else env
2050 self.env = copy.deepcopy(env)
2052 def wait_for_enter(self):
2053 if not hasattr(self, "testcase"):
2055 if self.testcase.debug_all and self.testcase.debug_gdbserver:
2057 print(double_line_delim)
2059 "Spawned GDB Server for '{app}' with PID: {pid}".format(
2060 app=self.app_name, pid=self.process.pid
2063 elif self.testcase.debug_all and self.testcase.debug_gdb:
2065 print(double_line_delim)
2067 "Spawned '{app}' with PID: {pid}".format(
2068 app=self.app_name, pid=self.process.pid
2073 print(single_line_delim)
2074 print("You can debug '{app}' using:".format(app=self.app_name))
2075 if self.testcase.debug_gdbserver:
2079 + " -ex 'target remote localhost:{port}'".format(
2080 port=self.testcase.gdbserver_port
2084 "Now is the time to attach gdb by running the above "
2085 "command, set up breakpoints etc., then resume from "
2086 "within gdb by issuing the 'continue' command"
2088 self.testcase.gdbserver_port += 1
2089 elif self.testcase.debug_gdb:
2093 + " -ex 'attach {pid}'".format(pid=self.process.pid)
2096 "Now is the time to attach gdb by running the above "
2097 "command and set up breakpoints etc., then resume from"
2098 " within gdb by issuing the 'continue' command"
2100 print(single_line_delim)
2101 input("Press ENTER to continue running the testcase...")
2104 executable = self.args[0]
2105 if not os.path.exists(executable) or not os.access(
2106 executable, os.F_OK | os.X_OK
2108 # Exit code that means some system file did not exist,
2109 # could not be opened, or had some other kind of error.
2110 self.result = os.EX_OSFILE
2111 raise EnvironmentError(
2112 "executable '%s' is not found or executable." % executable
2115 "Running executable '{app}': '{cmd}'".format(
2116 app=self.app_name, cmd=" ".join(self.args)
2119 env = os.environ.copy()
2120 env.update(self.env)
2121 env["CK_LOG_FILE_NAME"] = "-"
2122 self.process = subprocess.Popen(
2123 ["stdbuf", "-o0", "-e0"] + self.args,
2126 preexec_fn=os.setpgrp,
2127 stdout=subprocess.PIPE,
2128 stderr=subprocess.PIPE,
2130 self.wait_for_enter()
2131 out, err = self.process.communicate()
2132 self.logger.debug("Finished running `{app}'".format(app=self.app_name))
2133 self.logger.info("Return code is `%s'" % self.process.returncode)
2134 self.logger.info(single_line_delim)
2136 "Executable `{app}' wrote to stdout:".format(app=self.app_name)
2138 self.logger.info(single_line_delim)
2139 self.logger.info(out.decode("utf-8"))
2140 self.logger.info(single_line_delim)
2142 "Executable `{app}' wrote to stderr:".format(app=self.app_name)
2144 self.logger.info(single_line_delim)
2145 self.logger.info(err.decode("utf-8"))
2146 self.logger.info(single_line_delim)
2147 self.result = self.process.returncode
2150 if __name__ == "__main__":