3 from __future__ import print_function
18 from collections import deque
19 from threading import Thread, Event
20 from inspect import getdoc, isclass
21 from traceback import format_exception
22 from logging import FileHandler, DEBUG, Formatter
24 from abc import ABC, abstractmethod
25 from struct import pack, unpack
28 from scapy.packet import Raw, Packet
29 from config import config, available_cpus, num_cpus, max_vpp_cpus
30 import hook as hookmodule
31 from vpp_pg_interface import VppPGInterface
32 from vpp_sub_interface import VppSubInterface
33 from vpp_lo_interface import VppLoInterface
34 from vpp_bvi_interface import VppBviInterface
35 from vpp_papi_provider import VppPapiProvider
36 from vpp_papi import VppEnum
38 from vpp_papi.vpp_stats import VPPStats
39 from vpp_papi.vpp_transport_socket import VppTransportSocketIOError
49 from vpp_object import VppObjectRegistry
50 from util import ppp, is_core_present
51 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
52 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
53 from scapy.layers.inet6 import ICMPv6EchoReply
54 from vpp_running import use_running
55 from test_result_code import TestResultCode
58 logger = logging.getLogger(__name__)
60 # Set up an empty logger for the testcase that can be overridden as necessary
61 null_logger = logging.getLogger("VppTestCase")
62 null_logger.addHandler(logging.NullHandler())
65 if config.debug_framework:
69 Test framework module.
71 The module provides a set of tools for constructing and running tests and
72 representing the results.
76 class VppDiedError(Exception):
77 """exception for reporting that the subprocess has died."""
81 for k, v in signal.__dict__.items()
82 if k.startswith("SIG") and not k.startswith("SIG_")
85 def __init__(self, rv=None, testcase=None, method_name=None):
87 self.signal_name = None
88 self.testcase = testcase
89 self.method_name = method_name
92 self.signal_name = VppDiedError.signals_by_value[-rv]
93 except (KeyError, TypeError):
96 if testcase is None and method_name is None:
99 in_msg = " while running %s.%s" % (testcase, method_name)
102 msg = "VPP subprocess died unexpectedly%s with return code: %d%s." % (
105 " [%s]" % (self.signal_name if self.signal_name is not None else ""),
108 msg = "VPP subprocess died unexpectedly%s." % in_msg
110 super(VppDiedError, self).__init__(msg)
113 class _PacketInfo(object):
114 """Private class to create packet info object.
116 Help process information about the next packet.
117 Set variables to default values.
120 #: Store the index of the packet.
122 #: Store the index of the source packet generator interface of the packet.
124 #: Store the index of the destination packet generator interface
127 #: Store expected ip version
129 #: Store expected upper protocol
131 #: Store the copy of the former packet.
134 def __eq__(self, other):
135 index = self.index == other.index
136 src = self.src == other.src
137 dst = self.dst == other.dst
138 data = self.data == other.data
139 return index and src and dst and data
142 def pump_output(testclass):
143 """pump output from vpp stdout/stderr to proper queues"""
144 if not hasattr(testclass, "vpp"):
148 while not testclass.pump_thread_stop_flag.is_set():
149 readable = select.select(
151 testclass.vpp.stdout.fileno(),
152 testclass.vpp.stderr.fileno(),
153 testclass.pump_thread_wakeup_pipe[0],
158 if testclass.vpp.stdout.fileno() in readable:
159 read = os.read(testclass.vpp.stdout.fileno(), 102400)
161 split = read.decode("ascii", errors="backslashreplace").splitlines(True)
162 if len(stdout_fragment) > 0:
163 split[0] = "%s%s" % (stdout_fragment, split[0])
164 if len(split) > 0 and split[-1].endswith("\n"):
168 stdout_fragment = split[-1]
169 testclass.vpp_stdout_deque.extend(split[:limit])
170 if not config.cache_vpp_output:
171 for line in split[:limit]:
172 testclass.logger.info("VPP STDOUT: %s" % line.rstrip("\n"))
173 if testclass.vpp.stderr.fileno() in readable:
174 read = os.read(testclass.vpp.stderr.fileno(), 102400)
176 split = read.decode("ascii", errors="backslashreplace").splitlines(True)
177 if len(stderr_fragment) > 0:
178 split[0] = "%s%s" % (stderr_fragment, split[0])
179 if len(split) > 0 and split[-1].endswith("\n"):
183 stderr_fragment = split[-1]
185 testclass.vpp_stderr_deque.extend(split[:limit])
186 if not config.cache_vpp_output:
187 for line in split[:limit]:
188 testclass.logger.error("VPP STDERR: %s" % line.rstrip("\n"))
189 # ignoring the dummy pipe here intentionally - the
190 # flag will take care of properly terminating the loop
193 def _is_platform_aarch64():
194 return platform.machine() == "aarch64"
197 is_platform_aarch64 = _is_platform_aarch64()
200 def _is_distro_ubuntu2204():
201 with open("/etc/os-release") as f:
202 for line in f.readlines():
208 is_distro_ubuntu2204 = _is_distro_ubuntu2204()
211 def _is_distro_debian11():
212 with open("/etc/os-release") as f:
213 for line in f.readlines():
214 if "bullseye" in line:
219 is_distro_debian11 = _is_distro_debian11()
222 class KeepAliveReporter(object):
224 Singleton object which reports test start to parent process
230 self.__dict__ = self._shared_state
238 def pipe(self, pipe):
239 if self._pipe is not None:
240 raise Exception("Internal error - pipe should only be set once.")
243 def send_keep_alive(self, test, desc=None):
245 Write current test tmpdir & desc to keep-alive pipe to signal liveness
247 if not hasattr(test, "vpp") or self.pipe is None:
248 # if not running forked..
252 desc = "%s (%s)" % (desc, unittest.util.strclass(test))
256 self.pipe.send((desc, config.vpp, test.tempdir, test.vpp.pid))
259 class TestCaseTag(Enum):
260 # marks the suites that must run at the end
261 # using only a single test runner
263 # marks the suites broken on VPP multi-worker
264 FIXME_VPP_WORKERS = 2
265 # marks the suites broken when ASan is enabled
267 # marks suites broken on Ubuntu-22.04
269 # marks suites broken on Debian-11
271 # marks suites broken on debug vpp image
275 def create_tag_decorator(e):
278 cls.test_tags.append(e)
279 except AttributeError:
286 tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
287 tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
288 tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN)
289 tag_fixme_ubuntu2204 = create_tag_decorator(TestCaseTag.FIXME_UBUNTU2204)
290 tag_fixme_debian11 = create_tag_decorator(TestCaseTag.FIXME_DEBIAN11)
291 tag_fixme_vpp_debug = create_tag_decorator(TestCaseTag.FIXME_VPP_DEBUG)
305 class CPUInterface(ABC):
307 skipped_due_to_cpu_lack = False
311 def get_cpus_required(cls):
315 def assign_cpus(cls, cpus):
320 class VppTestCase(CPUInterface, unittest.TestCase):
321 """This subclass is a base class for VPP test cases that are implemented as
322 classes. It provides methods to create and run test case.
325 extra_vpp_statseg_config = ""
326 extra_vpp_config = []
327 extra_vpp_plugin_config = []
329 vapi_response_timeout = 5
330 remove_configured_vpp_objects_on_tear_down = True
333 def packet_infos(self):
334 """List of packet infos"""
335 return self._packet_infos
338 def get_packet_count_for_if_idx(cls, dst_if_index):
339 """Get the number of packet info for specified destination if index"""
340 if dst_if_index in cls._packet_count_for_dst_if_idx:
341 return cls._packet_count_for_dst_if_idx[dst_if_index]
346 def has_tag(cls, tag):
347 """if the test case has a given tag - return true"""
349 return tag in cls.test_tags
350 except AttributeError:
355 def is_tagged_run_solo(cls):
356 """if the test case class is timing-sensitive - return true"""
357 return cls.has_tag(TestCaseTag.RUN_SOLO)
360 def skip_fixme_asan(cls):
361 """if @tag_fixme_asan & ASan is enabled - mark for skip"""
362 if cls.has_tag(TestCaseTag.FIXME_ASAN):
363 vpp_extra_cmake_args = os.environ.get("VPP_EXTRA_CMAKE_ARGS", "")
364 if "DVPP_ENABLE_SANITIZE_ADDR=ON" in vpp_extra_cmake_args:
365 cls = unittest.skip("Skipping @tag_fixme_asan tests")(cls)
368 def skip_fixme_ubuntu2204(cls):
369 """if distro is ubuntu 22.04 and @tag_fixme_ubuntu2204 mark for skip"""
370 if cls.has_tag(TestCaseTag.FIXME_UBUNTU2204):
371 cls = unittest.skip("Skipping @tag_fixme_ubuntu2204 tests")(cls)
374 def skip_fixme_debian11(cls):
375 """if distro is Debian-11 and @tag_fixme_debian11 mark for skip"""
376 if cls.has_tag(TestCaseTag.FIXME_DEBIAN11):
377 cls = unittest.skip("Skipping @tag_fixme_debian11 tests")(cls)
380 def skip_fixme_vpp_debug(cls):
381 cls = unittest.skip("Skipping @tag_fixme_vpp_debug tests")(cls)
385 """Return the instance of this testcase"""
386 return cls.test_instance
389 def set_debug_flags(cls, d):
390 cls.gdbserver_port = 7777
391 cls.debug_core = False
392 cls.debug_gdb = False
393 cls.debug_gdbserver = False
394 cls.debug_all = False
395 cls.debug_attach = False
400 cls.debug_core = True
401 elif dl == "gdb" or dl == "gdb-all":
403 elif dl == "gdbserver" or dl == "gdbserver-all":
404 cls.debug_gdbserver = True
406 cls.debug_attach = True
408 raise Exception("Unrecognized DEBUG option: '%s'" % d)
409 if dl == "gdb-all" or dl == "gdbserver-all":
413 def get_vpp_worker_count(cls):
414 if not hasattr(cls, "vpp_worker_count"):
415 if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
416 cls.vpp_worker_count = 0
418 cls.vpp_worker_count = config.vpp_worker_count
419 return cls.vpp_worker_count
422 def get_cpus_required(cls):
423 return 1 + cls.get_vpp_worker_count()
426 def setUpConstants(cls):
427 """Set-up the test case class based on environment variables"""
428 cls.step = config.step
429 cls.plugin_path = ":".join(config.vpp_plugin_dir)
430 cls.test_plugin_path = ":".join(config.vpp_test_plugin_dir)
431 cls.extern_plugin_path = ":".join(config.extern_plugin_dir)
433 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
434 debug_cli = "cli-listen localhost:5002"
435 size = re.search(r"\d+[gG]", config.coredump_size)
437 coredump_size = f"coredump-size {config.coredump_size}".lower()
439 coredump_size = "coredump-size unlimited"
440 default_variant = config.variant
441 if default_variant is not None:
442 default_variant = "default { variant %s 100 }" % default_variant
446 api_fuzzing = config.api_fuzz
447 if api_fuzzing is None:
468 cls.get_api_segment_prefix(),
475 if cls.extern_plugin_path not in (None, ""):
476 cls.extra_vpp_plugin_config.append("add-path %s" % cls.extern_plugin_path)
477 if cls.get_vpp_worker_count():
478 cls.vpp_cmdline.extend(
479 ["corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])]
481 cls.vpp_cmdline.extend(
492 cls.get_stats_sock_path(),
493 cls.extra_vpp_statseg_config,
498 cls.get_api_sock_path(),
519 "lisp_unittest_plugin.so",
524 "unittest_plugin.so",
529 + cls.extra_vpp_plugin_config
535 if cls.extra_vpp_config is not None:
536 cls.vpp_cmdline.extend(cls.extra_vpp_config)
538 if not cls.debug_attach:
539 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
540 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
543 def wait_for_enter(cls):
544 if cls.debug_gdbserver:
545 print(double_line_delim)
546 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
548 print(double_line_delim)
549 print("Spawned VPP with PID: %d" % cls.vpp.pid)
551 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
553 print(single_line_delim)
554 print("You can debug VPP using:")
555 if cls.debug_gdbserver:
557 f"sudo gdb {config.vpp} "
558 f"-ex 'target remote localhost:{cls.gdbserver_port}'"
561 "Now is the time to attach gdb by running the above "
562 "command, set up breakpoints etc., then resume VPP from "
563 "within gdb by issuing the 'continue' command"
565 cls.gdbserver_port += 1
567 print(f"sudo gdb {config.vpp} -ex 'attach {cls.vpp.pid}'")
569 "Now is the time to attach gdb by running the above "
570 "command and set up breakpoints etc., then resume VPP from"
571 " within gdb by issuing the 'continue' command"
573 print(single_line_delim)
574 input("Press ENTER to continue running the testcase...")
583 is_distro_ubuntu2204 == True and cls.has_tag(TestCaseTag.FIXME_UBUNTU2204)
584 ) or (is_distro_debian11 == True and cls.has_tag(TestCaseTag.FIXME_DEBIAN11)):
586 cls.logger.debug(f"Assigned cpus: {cls.cpus}")
587 cmdline = cls.vpp_cmdline
589 if cls.debug_gdbserver:
590 gdbserver = "/usr/bin/gdbserver"
591 if not os.path.isfile(gdbserver) or not os.access(gdbserver, os.X_OK):
593 "gdbserver binary '%s' does not exist or is "
594 "not executable" % gdbserver
599 "localhost:{port}".format(port=cls.gdbserver_port),
601 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
604 cls.vpp = subprocess.Popen(
605 cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE
607 except subprocess.CalledProcessError as e:
609 "Subprocess returned with non-0 return code: (%s)", e.returncode
614 "Subprocess returned with OS error: (%s) %s", e.errno, e.strerror
617 except Exception as e:
618 cls.logger.exception("Subprocess returned unexpected from %s:", cmdline)
624 def wait_for_coredump(cls):
625 corefile = cls.tempdir + "/core"
626 if os.path.isfile(corefile):
627 cls.logger.error("Waiting for coredump to complete: %s", corefile)
628 curr_size = os.path.getsize(corefile)
629 deadline = time.time() + 60
631 while time.time() < deadline:
634 curr_size = os.path.getsize(corefile)
635 if size == curr_size:
640 "Timed out waiting for coredump to complete: %s", corefile
643 cls.logger.error("Coredump complete: %s, size %d", corefile, curr_size)
646 def get_stats_sock_path(cls):
647 return "%s/stats.sock" % cls.tempdir
650 def get_api_sock_path(cls):
651 return "%s/api.sock" % cls.tempdir
654 def get_api_segment_prefix(cls):
655 return os.path.basename(cls.tempdir) # Only used for VAPI
658 def get_tempdir(cls):
660 tmpdir = f"{config.tmp_dir}/unittest-attach-gdb"
662 tmpdir = f"{config.tmp_dir}/vpp-unittest-{cls.__name__}"
663 if config.wipe_tmp_dir:
664 shutil.rmtree(tmpdir, ignore_errors=True)
669 def create_file_handler(cls):
670 if config.log_dir is None:
671 cls.file_handler = FileHandler(f"{cls.tempdir}/log.txt")
674 logdir = f"{config.log_dir}/vpp-unittest-{cls.__name__}"
675 if config.wipe_tmp_dir:
676 shutil.rmtree(logdir, ignore_errors=True)
678 cls.file_handler = FileHandler(f"{logdir}/log.txt")
683 Perform class setup before running the testcase
684 Remove shared memory files, start vpp and connect the vpp-api
686 super(VppTestCase, cls).setUpClass()
687 cls.logger = get_logger(cls.__name__)
688 random.seed(config.rnd_seed)
689 if hasattr(cls, "parallel_handler"):
690 cls.logger.addHandler(cls.parallel_handler)
691 cls.logger.propagate = False
692 cls.set_debug_flags(config.debug)
693 cls.tempdir = cls.get_tempdir()
694 cls.create_file_handler()
695 cls.file_handler.setFormatter(
696 Formatter(fmt="%(asctime)s,%(msecs)03d %(message)s", datefmt="%H:%M:%S")
698 cls.file_handler.setLevel(DEBUG)
699 cls.logger.addHandler(cls.file_handler)
700 cls.logger.debug("--- setUpClass() for %s called ---" % cls.__name__)
701 os.chdir(cls.tempdir)
703 "Temporary dir is %s, api socket is %s",
705 cls.get_api_sock_path(),
707 cls.logger.debug("Random seed is %s", config.rnd_seed)
709 cls.reset_packet_infos()
714 cls.registry = VppObjectRegistry()
715 cls.vpp_startup_failed = False
716 cls.reporter = KeepAliveReporter()
717 # need to catch exceptions here because if we raise, then the cleanup
718 # doesn't get called and we might end with a zombie vpp
724 if not hasattr(cls, "vpp"):
726 cls.reporter.send_keep_alive(cls, "setUpClass")
727 VppTestResult.current_test_case_info = TestCaseInfo(
728 cls.logger, cls.tempdir, cls.vpp.pid, config.vpp
730 cls.vpp_stdout_deque = deque()
731 cls.vpp_stderr_deque = deque()
732 # Pump thread in a non-debug-attached & not running-vpp
733 if not cls.debug_attach and not hasattr(cls, "running_vpp"):
734 cls.pump_thread_stop_flag = Event()
735 cls.pump_thread_wakeup_pipe = os.pipe()
736 cls.pump_thread = Thread(target=pump_output, args=(cls,))
737 cls.pump_thread.daemon = True
738 cls.pump_thread.start()
739 if cls.debug_gdb or cls.debug_gdbserver or cls.debug_attach:
740 cls.vapi_response_timeout = 0
741 cls.vapi = VppPapiProvider(cls.__name__, cls, cls.vapi_response_timeout)
743 hook = hookmodule.StepHook(cls)
745 hook = hookmodule.PollHook(cls)
746 cls.vapi.register_hook(hook)
747 cls.statistics = VPPStats(socketname=cls.get_stats_sock_path())
751 cls.vpp_startup_failed = True
753 "VPP died shortly after startup, check the"
754 " output to standard error for possible cause"
759 except (vpp_papi.VPPIOError, Exception) as e:
760 cls.logger.debug("Exception connecting to vapi: %s" % e)
761 cls.vapi.disconnect()
763 if cls.debug_gdbserver:
766 "You're running VPP inside gdbserver but "
767 "VPP-API connection failed, did you forget "
768 "to 'continue' VPP from within gdb?",
774 last_line = cls.vapi.cli("show thread").split("\n")[-2]
775 cls.vpp_worker_count = int(last_line.split(" ")[0])
776 print("Detected VPP with %s workers." % cls.vpp_worker_count)
777 except vpp_papi.VPPRuntimeError as e:
778 cls.logger.debug("%s" % e)
781 except Exception as e:
782 cls.logger.debug("Exception connecting to VPP: %s" % e)
787 def _debug_quit(cls):
788 if cls.debug_gdbserver or cls.debug_gdb:
792 if cls.vpp.returncode is None:
794 print(double_line_delim)
795 print("VPP or GDB server is still running")
796 print(single_line_delim)
798 "When done debugging, press ENTER to kill the "
799 "process and finish running the testcase..."
801 except AttributeError:
807 Disconnect vpp-api, kill vpp and cleanup shared memory files
810 if hasattr(cls, "running_vpp"):
813 # first signal that we want to stop the pump thread, then wake it up
814 if hasattr(cls, "pump_thread_stop_flag"):
815 cls.pump_thread_stop_flag.set()
816 if hasattr(cls, "pump_thread_wakeup_pipe"):
817 os.write(cls.pump_thread_wakeup_pipe[1], b"ding dong wake up")
818 if hasattr(cls, "pump_thread"):
819 cls.logger.debug("Waiting for pump thread to stop")
820 cls.pump_thread.join()
821 if hasattr(cls, "vpp_stderr_reader_thread"):
822 cls.logger.debug("Waiting for stderr pump to stop")
823 cls.vpp_stderr_reader_thread.join()
825 if hasattr(cls, "vpp"):
826 if hasattr(cls, "vapi"):
827 cls.logger.debug(cls.vapi.vpp.get_stats())
828 cls.logger.debug("Disconnecting class vapi client on %s", cls.__name__)
829 cls.vapi.disconnect()
830 cls.logger.debug("Deleting class vapi attribute on %s", cls.__name__)
833 if not cls.debug_attach and cls.vpp.returncode is None:
834 cls.wait_for_coredump()
835 cls.logger.debug("Sending TERM to vpp")
837 cls.logger.debug("Waiting for vpp to die")
839 outs, errs = cls.vpp.communicate(timeout=5)
840 except subprocess.TimeoutExpired:
842 outs, errs = cls.vpp.communicate()
843 cls.logger.debug("Deleting class vpp attribute on %s", cls.__name__)
844 if not cls.debug_attach and not hasattr(cls, "running_vpp"):
845 cls.vpp.stdout.close()
846 cls.vpp.stderr.close()
847 # If vpp is a dynamic attribute set by the func use_running,
848 # deletion will result in an AttributeError that we can
852 except AttributeError:
855 if cls.vpp_startup_failed:
856 stdout_log = cls.logger.info
857 stderr_log = cls.logger.critical
859 stdout_log = cls.logger.info
860 stderr_log = cls.logger.info
862 if hasattr(cls, "vpp_stdout_deque"):
863 stdout_log(single_line_delim)
864 stdout_log("VPP output to stdout while running %s:", cls.__name__)
865 stdout_log(single_line_delim)
866 vpp_output = "".join(cls.vpp_stdout_deque)
867 with open(cls.tempdir + "/vpp_stdout.txt", "w") as f:
869 stdout_log("\n%s", vpp_output)
870 stdout_log(single_line_delim)
872 if hasattr(cls, "vpp_stderr_deque"):
873 stderr_log(single_line_delim)
874 stderr_log("VPP output to stderr while running %s:", cls.__name__)
875 stderr_log(single_line_delim)
876 vpp_output = "".join(cls.vpp_stderr_deque)
877 with open(cls.tempdir + "/vpp_stderr.txt", "w") as f:
879 stderr_log("\n%s", vpp_output)
880 stderr_log(single_line_delim)
883 def tearDownClass(cls):
884 """Perform final cleanup after running all tests in this test-case"""
885 cls.logger.debug("--- tearDownClass() for %s called ---" % cls.__name__)
886 if not hasattr(cls, "vpp"):
888 cls.reporter.send_keep_alive(cls, "tearDownClass")
890 cls.file_handler.close()
891 cls.reset_packet_infos()
892 if config.debug_framework:
893 debug_internal.on_tear_down_class(cls)
895 def show_commands_at_teardown(self):
896 """Allow subclass specific teardown logging additions."""
897 self.logger.info("--- No test specific show commands provided. ---")
900 """Show various debug prints after each test"""
902 "--- tearDown() for %s.%s(%s) called ---"
903 % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
905 if not hasattr(self, "vpp"):
909 if not self.vpp_dead:
910 self.logger.debug(self.vapi.cli("show trace max 1000"))
911 self.logger.info(self.vapi.ppcli("show interface"))
912 self.logger.info(self.vapi.ppcli("show hardware"))
913 self.logger.info(self.statistics.set_errors_str())
914 self.logger.info(self.vapi.ppcli("show run"))
915 self.logger.info(self.vapi.ppcli("show log"))
916 self.logger.info(self.vapi.ppcli("show bihash"))
917 self.logger.info("Logging testcase specific show commands.")
918 self.show_commands_at_teardown()
919 if self.remove_configured_vpp_objects_on_tear_down:
920 self.registry.remove_vpp_config(self.logger)
921 # Save/Dump VPP api trace log
922 m = self._testMethodName
923 api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
924 tmp_api_trace = "/tmp/%s" % api_trace
925 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
926 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
927 self.logger.info("Moving %s to %s\n" % (tmp_api_trace, vpp_api_trace_log))
928 shutil.move(tmp_api_trace, vpp_api_trace_log)
929 except VppTransportSocketIOError:
931 "VppTransportSocketIOError: Vpp dead. Cannot log show commands."
935 self.registry.unregister_all(self.logger)
938 """Clear trace before running each test"""
939 super(VppTestCase, self).setUp()
940 if not hasattr(self, "vpp"):
942 self.reporter.send_keep_alive(self)
946 testcase=self.__class__.__name__,
947 method_name=self._testMethodName,
949 self.sleep(0.1, "during setUp")
950 self.vpp_stdout_deque.append(
951 "--- test setUp() for %s.%s(%s) starts here ---\n"
952 % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
954 self.vpp_stderr_deque.append(
955 "--- test setUp() for %s.%s(%s) starts here ---\n"
956 % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
958 self.vapi.cli("clear trace")
959 # store the test instance inside the test class - so that objects
960 # holding the class can access instance methods (like assertEqual)
961 type(self).test_instance = self
964 def pg_enable_capture(cls, interfaces=None):
966 Enable capture on packet-generator interfaces
968 :param interfaces: iterable interface indexes (if None,
969 use self.pg_interfaces)
972 if interfaces is None:
973 interfaces = cls.pg_interfaces
978 def register_pcap(cls, intf, worker):
979 """Register a pcap in the testclass"""
980 # add to the list of captures with current timestamp
981 cls._pcaps.append((intf, worker))
984 def get_vpp_time(cls):
985 # processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT"
986 # returns float("2.190522")
987 timestr = cls.vapi.cli("show clock")
988 head, sep, tail = timestr.partition(",")
989 head, sep, tail = head.partition("Time now")
993 def sleep_on_vpp_time(cls, sec):
994 """Sleep according to time in VPP world"""
995 # On a busy system with many processes
996 # we might end up with VPP time being slower than real world
997 # So take that into account when waiting for VPP to do something
998 start_time = cls.get_vpp_time()
999 while cls.get_vpp_time() - start_time < sec:
1003 def pg_start(cls, trace=True):
1004 """Enable the PG, wait till it is done, then clean up"""
1005 for intf, worker in cls._old_pcaps:
1006 intf.handle_old_pcap_file(intf.get_in_path(worker), intf.in_history_counter)
1009 cls.vapi.cli("clear trace")
1010 cls.vapi.cli("trace add pg-input 1000")
1011 cls.vapi.cli("packet-generator enable")
1012 # PG, when starts, runs to completion -
1013 # so let's avoid a race condition,
1014 # and wait a little till it's done.
1015 # Then clean it up - and then be gone.
1016 deadline = time.time() + 300
1017 while cls.vapi.cli("show packet-generator").find("Yes") != -1:
1018 cls.sleep(0.01) # yield
1019 if time.time() > deadline:
1020 cls.logger.error("Timeout waiting for pg to stop")
1022 for intf, worker in cls._pcaps:
1023 cls.vapi.cli("packet-generator delete %s" % intf.get_cap_name(worker))
1024 cls._old_pcaps = cls._pcaps
1028 def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0, mode=None):
1030 Create packet-generator interfaces.
1032 :param interfaces: iterable indexes of the interfaces.
1033 :returns: List of created interfaces.
1037 for i in interfaces:
1038 intf = VppPGInterface(cls, i, gso, gso_size, mode)
1039 setattr(cls, intf.name, intf)
1041 cls.pg_interfaces = result
1045 def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
1046 if not hasattr(cls, "vpp"):
1047 cls.pg_interfaces = []
1048 return cls.pg_interfaces
1049 pgmode = VppEnum.vl_api_pg_interface_mode_t
1050 return cls.create_pg_interfaces_internal(
1051 interfaces, gso, gso_size, pgmode.PG_API_MODE_IP4
1055 def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
1056 if not hasattr(cls, "vpp"):
1057 cls.pg_interfaces = []
1058 return cls.pg_interfaces
1059 pgmode = VppEnum.vl_api_pg_interface_mode_t
1060 return cls.create_pg_interfaces_internal(
1061 interfaces, gso, gso_size, pgmode.PG_API_MODE_IP6
1065 def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
1066 if not hasattr(cls, "vpp"):
1067 cls.pg_interfaces = []
1068 return cls.pg_interfaces
1069 pgmode = VppEnum.vl_api_pg_interface_mode_t
1070 return cls.create_pg_interfaces_internal(
1071 interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
1075 def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
1076 if not hasattr(cls, "vpp"):
1077 cls.pg_interfaces = []
1078 return cls.pg_interfaces
1079 pgmode = VppEnum.vl_api_pg_interface_mode_t
1080 return cls.create_pg_interfaces_internal(
1081 interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
1085 def create_loopback_interfaces(cls, count):
1087 Create loopback interfaces.
1089 :param count: number of interfaces created.
1090 :returns: List of created interfaces.
1092 if not hasattr(cls, "vpp"):
1093 cls.lo_interfaces = []
1094 return cls.lo_interfaces
1095 result = [VppLoInterface(cls) for i in range(count)]
1097 setattr(cls, intf.name, intf)
1098 cls.lo_interfaces = result
1102 def create_bvi_interfaces(cls, count):
1104 Create BVI interfaces.
1106 :param count: number of interfaces created.
1107 :returns: List of created interfaces.
1109 if not hasattr(cls, "vpp"):
1110 cls.bvi_interfaces = []
1111 return cls.bvi_interfaces
1112 result = [VppBviInterface(cls) for i in range(count)]
1114 setattr(cls, intf.name, intf)
1115 cls.bvi_interfaces = result
1119 def extend_packet(packet, size, padding=" "):
1121 Extend packet to given size by padding with spaces or custom padding
1122 NOTE: Currently works only when Raw layer is present.
1124 :param packet: packet
1125 :param size: target size
1126 :param padding: padding used to extend the payload
1129 packet_len = len(packet) + 4
1130 extend = size - packet_len
1132 num = (extend // len(padding)) + 1
1133 packet[Raw].load += (padding * num)[:extend].encode("ascii")
1136 def reset_packet_infos(cls):
1137 """Reset the list of packet info objects and packet counts to zero"""
1138 cls._packet_infos = {}
1139 cls._packet_count_for_dst_if_idx = {}
1142 def create_packet_info(cls, src_if, dst_if):
1144 Create packet info object containing the source and destination indexes
1145 and add it to the testcase's packet info list
1147 :param VppInterface src_if: source interface
1148 :param VppInterface dst_if: destination interface
1150 :returns: _PacketInfo object
1153 info = _PacketInfo()
1154 info.index = len(cls._packet_infos)
1155 info.src = src_if.sw_if_index
1156 info.dst = dst_if.sw_if_index
1157 if isinstance(dst_if, VppSubInterface):
1158 dst_idx = dst_if.parent.sw_if_index
1160 dst_idx = dst_if.sw_if_index
1161 if dst_idx in cls._packet_count_for_dst_if_idx:
1162 cls._packet_count_for_dst_if_idx[dst_idx] += 1
1164 cls._packet_count_for_dst_if_idx[dst_idx] = 1
1165 cls._packet_infos[info.index] = info
1169 def info_to_payload(info):
1171 Convert _PacketInfo object to packet payload
1173 :param info: _PacketInfo object
1175 :returns: string containing serialized data from packet info
1178 # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1179 return pack("iiiih", info.index, info.src, info.dst, info.ip, info.proto)
1182 def payload_to_info(payload, payload_field="load"):
1184 Convert packet payload to _PacketInfo object
1186 :param payload: packet payload
1187 :type payload: <class 'scapy.packet.Raw'>
1188 :param payload_field: packet fieldname of payload "load" for
1189 <class 'scapy.packet.Raw'>
1190 :type payload_field: str
1191 :returns: _PacketInfo object containing de-serialized data from payload
1195 # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1196 payload_b = getattr(payload, payload_field)[:18]
1198 info = _PacketInfo()
1199 info.index, info.src, info.dst, info.ip, info.proto = unpack("iiiih", payload_b)
1201 # some SRv6 TCs depend on get an exception if bad values are detected
1202 if info.index > 0x4000:
1203 raise ValueError("Index value is invalid")
1207 def get_next_packet_info(self, info):
1209 Iterate over the packet info list stored in the testcase
1210 Start iteration with first element if info is None
1211 Continue based on index in info if info is specified
1213 :param info: info or None
1214 :returns: next info in list or None if no more infos
1219 next_index = info.index + 1
1220 if next_index == len(self._packet_infos):
1223 return self._packet_infos[next_index]
1225 def get_next_packet_info_for_interface(self, src_index, info):
1227 Search the packet info list for the next packet info with same source
1230 :param src_index: source interface index to search for
1231 :param info: packet info - where to start the search
1232 :returns: packet info or None
1236 info = self.get_next_packet_info(info)
1239 if info.src == src_index:
1242 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
1244 Search the packet info list for the next packet info with same source
1245 and destination interface indexes
1247 :param src_index: source interface index to search for
1248 :param dst_index: destination interface index to search for
1249 :param info: packet info - where to start the search
1250 :returns: packet info or None
1254 info = self.get_next_packet_info_for_interface(src_index, info)
1257 if info.dst == dst_index:
1260 def assert_equal(self, real_value, expected_value, name_or_class=None):
1261 if name_or_class is None:
1262 self.assertEqual(real_value, expected_value)
1265 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
1267 getdoc(name_or_class).strip(),
1269 str(name_or_class(real_value)),
1271 str(name_or_class(expected_value)),
1274 msg = "Invalid %s: %s does not match expected value %s" % (
1280 self.assertEqual(real_value, expected_value, msg)
1282 def assert_in_range(self, real_value, expected_min, expected_max, name=None):
1286 msg = "Invalid %s: %s out of range <%s,%s>" % (
1292 self.assertTrue(expected_min <= real_value <= expected_max, msg)
1294 def assert_packet_checksums_valid(self, packet, ignore_zero_udp_checksums=True):
1295 received = packet.__class__(scapy.compat.raw(packet))
1296 udp_layers = ["UDP", "UDPerror"]
1297 checksum_fields = ["cksum", "chksum"]
1300 temp = received.__class__(scapy.compat.raw(received))
1302 layer = temp.getlayer(counter)
1304 layer = layer.copy()
1305 layer.remove_payload()
1306 for cf in checksum_fields:
1307 if hasattr(layer, cf):
1309 ignore_zero_udp_checksums
1310 and 0 == getattr(layer, cf)
1311 and layer.name in udp_layers
1314 delattr(temp.getlayer(counter), cf)
1315 checksums.append((counter, cf))
1318 counter = counter + 1
1319 if 0 == len(checksums):
1321 temp = temp.__class__(scapy.compat.raw(temp))
1322 for layer, cf in reversed(checksums):
1323 calc_sum = getattr(temp[layer], cf)
1325 getattr(received[layer], cf),
1327 "packet checksum on layer #%d: %s" % (layer, temp[layer].name),
1330 "Checksum field `%s` on `%s` layer has correct value `%s`"
1331 % (cf, temp[layer].name, calc_sum)
1334 def assert_checksum_valid(
1338 checksum_field_names=["chksum", "cksum"],
1339 ignore_zero_checksum=False,
1341 """Check checksum of received packet on given layer"""
1342 layer_copy = received_packet[layer].copy()
1343 layer_copy.remove_payload()
1345 for f in checksum_field_names:
1346 if hasattr(layer_copy, f):
1349 if field_name is None:
1351 f"Layer `{layer}` has none of checksum fields: `{checksum_field_names}`."
1353 received_packet_checksum = getattr(received_packet[layer], field_name)
1354 if ignore_zero_checksum and 0 == received_packet_checksum:
1356 recalculated = received_packet.__class__(scapy.compat.raw(received_packet))
1357 delattr(recalculated[layer], field_name)
1358 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1360 received_packet_checksum,
1361 getattr(recalculated[layer], field_name),
1362 f"packet checksum (field: {field_name}) on layer: %s" % layer,
1365 def assert_ip_checksum_valid(self, received_packet, ignore_zero_checksum=False):
1366 self.assert_checksum_valid(
1367 received_packet, "IP", ignore_zero_checksum=ignore_zero_checksum
1370 def assert_tcp_checksum_valid(self, received_packet, ignore_zero_checksum=False):
1371 self.assert_checksum_valid(
1372 received_packet, "TCP", ignore_zero_checksum=ignore_zero_checksum
1375 def assert_udp_checksum_valid(self, received_packet, ignore_zero_checksum=True):
1376 self.assert_checksum_valid(
1377 received_packet, "UDP", ignore_zero_checksum=ignore_zero_checksum
1380 def assert_embedded_icmp_checksum_valid(self, received_packet):
1381 if received_packet.haslayer(IPerror):
1382 self.assert_checksum_valid(received_packet, "IPerror")
1383 if received_packet.haslayer(TCPerror):
1384 self.assert_checksum_valid(received_packet, "TCPerror")
1385 if received_packet.haslayer(UDPerror):
1386 self.assert_checksum_valid(
1387 received_packet, "UDPerror", ignore_zero_checksum=True
1389 if received_packet.haslayer(ICMPerror):
1390 self.assert_checksum_valid(received_packet, "ICMPerror")
1392 def assert_icmp_checksum_valid(self, received_packet):
1393 self.assert_checksum_valid(received_packet, "ICMP")
1394 self.assert_embedded_icmp_checksum_valid(received_packet)
1396 def assert_icmpv6_checksum_valid(self, pkt):
1397 if pkt.haslayer(ICMPv6DestUnreach):
1398 self.assert_checksum_valid(pkt, "ICMPv6DestUnreach")
1399 self.assert_embedded_icmp_checksum_valid(pkt)
1400 if pkt.haslayer(ICMPv6EchoRequest):
1401 self.assert_checksum_valid(pkt, "ICMPv6EchoRequest")
1402 if pkt.haslayer(ICMPv6EchoReply):
1403 self.assert_checksum_valid(pkt, "ICMPv6EchoReply")
1405 def get_counter(self, counter):
1406 if counter.startswith("/"):
1407 counter_value = self.statistics.get_counter(counter)
1409 counters = self.vapi.cli("sh errors").split("\n")
1411 for i in range(1, len(counters) - 1):
1412 results = counters[i].split()
1413 if results[1] == counter:
1414 counter_value = int(results[0])
1416 return counter_value
1418 def assert_counter_equal(self, counter, expected_value, thread=None, index=0):
1419 c = self.get_counter(counter)
1420 if thread is not None:
1421 c = c[thread][index]
1423 c = sum(x[index] for x in c)
1425 "validate counter `%s[%s]', expected: %s, real value: %s"
1426 % (counter, index, expected_value, c)
1428 self.assert_equal(c, expected_value, "counter `%s[%s]'" % (counter, index))
1430 def assert_packet_counter_equal(self, counter, expected_value):
1431 counter_value = self.get_counter(counter)
1433 counter_value, expected_value, "packet counter `%s'" % counter
1436 def assert_error_counter_equal(self, counter, expected_value):
1437 counter_value = self.statistics[counter].sum()
1438 self.assert_equal(counter_value, expected_value, "error counter `%s'" % counter)
1441 def sleep(cls, timeout, remark=None):
1442 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1443 # * by Guido, only the main thread can be interrupted.
1445 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1448 if hasattr(os, "sched_yield"):
1454 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1455 before = time.time()
1458 if after - before > 2 * timeout:
1460 "unexpected self.sleep() result - slept for %es instead of ~%es!",
1466 "Finished sleep (%s) - slept %es (wanted %es)",
1472 def virtual_sleep(self, timeout, remark=None):
1473 self.logger.debug("Moving VPP time by %s (%s)", timeout, remark)
1474 self.vapi.cli("set clock adjust %s" % timeout)
1476 def pg_send(self, intf, pkts, worker=None, trace=True):
1477 intf.add_stream(pkts, worker=worker)
1478 self.pg_enable_capture(self.pg_interfaces)
1479 self.pg_start(trace=trace)
1481 def snapshot_stats(self, stats_diff):
1482 """Return snapshot of interesting stats based on diff dictionary."""
1484 for sw_if_index in stats_diff:
1485 for counter in stats_diff[sw_if_index]:
1486 stats_snapshot[counter] = self.statistics[counter]
1487 self.logger.debug(f"Took statistics stats_snapshot: {stats_snapshot}")
1488 return stats_snapshot
1490 def compare_stats_with_snapshot(self, stats_diff, stats_snapshot):
1491 """Assert appropriate difference between current stats and snapshot."""
1492 for sw_if_index in stats_diff:
1493 for cntr, diff in stats_diff[sw_if_index].items():
1494 if sw_if_index == "err":
1496 self.statistics[cntr].sum(),
1497 stats_snapshot[cntr].sum() + diff,
1498 f"'{cntr}' counter value (previous value: "
1499 f"{stats_snapshot[cntr].sum()}, "
1500 f"expected diff: {diff})",
1505 self.statistics[cntr][:, sw_if_index].sum(),
1506 stats_snapshot[cntr][:, sw_if_index].sum() + diff,
1507 f"'{cntr}' counter value (previous value: "
1508 f"{stats_snapshot[cntr][:, sw_if_index].sum()}, "
1509 f"expected diff: {diff})",
1511 except IndexError as e:
1512 # if diff is 0, then this most probably a case where
1513 # test declares multiple interfaces but traffic hasn't
1514 # passed through this one yet - which means the counter
1515 # value is 0 and can be ignored
1518 f"Couldn't sum counter: {cntr} on sw_if_index: {sw_if_index}"
1521 def send_and_assert_no_replies(
1522 self, intf, pkts, remark="", timeout=None, stats_diff=None, trace=True, msg=None
1525 stats_snapshot = self.snapshot_stats(stats_diff)
1527 self.pg_send(intf, pkts)
1532 for i in self.pg_interfaces:
1533 i.assert_nothing_captured(timeout=timeout, remark=remark)
1538 self.logger.debug(f"send_and_assert_no_replies: {msg}")
1539 self.logger.debug(self.vapi.cli("show trace"))
1542 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1544 def send_and_expect(
1556 stats_snapshot = self.snapshot_stats(stats_diff)
1559 n_rx = 1 if isinstance(pkts, Packet) else len(pkts)
1560 self.pg_send(intf, pkts, worker=worker, trace=trace)
1561 rx = output.get_capture(n_rx)
1564 self.logger.debug(f"send_and_expect: {msg}")
1565 self.logger.debug(self.vapi.cli("show trace"))
1568 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1572 def send_and_expect_load_balancing(
1573 self, input, pkts, outputs, worker=None, trace=True
1575 self.pg_send(input, pkts, worker=worker, trace=trace)
1578 rx = oo._get_capture(1)
1579 self.assertNotEqual(0, len(rx))
1582 self.logger.debug(self.vapi.cli("show trace"))
1585 def send_and_expect_some(self, intf, pkts, output, worker=None, trace=True):
1586 self.pg_send(intf, pkts, worker=worker, trace=trace)
1587 rx = output._get_capture(1)
1589 self.logger.debug(self.vapi.cli("show trace"))
1590 self.assertTrue(len(rx) > 0)
1591 self.assertTrue(len(rx) < len(pkts))
1594 def send_and_expect_only(self, intf, pkts, output, timeout=None, stats_diff=None):
1596 stats_snapshot = self.snapshot_stats(stats_diff)
1598 self.pg_send(intf, pkts)
1599 rx = output.get_capture(len(pkts))
1603 for i in self.pg_interfaces:
1604 if i not in outputs:
1605 i.assert_nothing_captured(timeout=timeout)
1609 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1614 def get_testcase_doc_name(test):
1615 return getdoc(test.__class__).splitlines()[0]
1618 def get_test_description(descriptions, test):
1619 short_description = test.shortDescription()
1620 if descriptions and short_description:
1621 return short_description
1626 class TestCaseInfo(object):
1627 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1628 self.logger = logger
1629 self.tempdir = tempdir
1630 self.vpp_pid = vpp_pid
1631 self.vpp_bin_path = vpp_bin_path
1632 self.core_crash_test = None
1635 class VppTestResult(unittest.TestResult):
1637 @property result_string
1638 String variable to store the test case result string.
1640 List variable containing 2-tuples of TestCase instances and strings
1641 holding formatted tracebacks. Each tuple represents a test which
1642 raised an unexpected exception.
1644 List variable containing 2-tuples of TestCase instances and strings
1645 holding formatted tracebacks. Each tuple represents a test where
1646 a failure was explicitly signalled using the TestCase.assert*()
1650 failed_test_cases_info = set()
1651 core_crash_test_cases_info = set()
1652 current_test_case_info = None
1654 def __init__(self, stream=None, descriptions=None, verbosity=None, runner=None):
1656 :param stream File descriptor to store where to report test results.
1657 Set to the standard error stream by default.
1658 :param descriptions Boolean variable to store information if to use
1659 test case descriptions.
1660 :param verbosity Integer variable to store required verbosity level.
1662 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1663 self.stream = stream
1664 self.descriptions = descriptions
1665 self.verbosity = verbosity
1666 self.result_code = TestResultCode.TEST_RUN
1667 self.result_string = None
1668 self.runner = runner
1671 def addSuccess(self, test):
1673 Record a test succeeded result
1678 self.log_result("addSuccess", test)
1679 unittest.TestResult.addSuccess(self, test)
1680 self.result_string = colorize("OK", GREEN)
1681 self.result_code = TestResultCode.PASS
1682 self.send_result_through_pipe(test, self.result_code)
1684 def addExpectedFailure(self, test, err):
1685 self.log_result("addExpectedFailure", test, err)
1686 super().addExpectedFailure(test, err)
1687 self.result_string = colorize("FAIL", GREEN)
1688 self.result_code = TestResultCode.EXPECTED_FAIL
1689 self.send_result_through_pipe(test, self.result_code)
1691 def addUnexpectedSuccess(self, test):
1692 self.log_result("addUnexpectedSuccess", test)
1693 super().addUnexpectedSuccess(test)
1694 self.result_string = colorize("OK", RED)
1695 self.result_code = TestResultCode.UNEXPECTED_PASS
1696 self.send_result_through_pipe(test, self.result_code)
1698 def addSkip(self, test, reason):
1700 Record a test skipped.
1706 self.log_result("addSkip", test, reason=reason)
1707 unittest.TestResult.addSkip(self, test, reason)
1708 self.result_string = colorize("SKIP", YELLOW)
1710 if reason == "not enough cpus":
1711 self.result_code = TestResultCode.SKIP_CPU_SHORTAGE
1713 self.result_code = TestResultCode.SKIP
1714 self.send_result_through_pipe(test, self.result_code)
1716 def symlink_failed(self):
1717 if self.current_test_case_info:
1719 failed_dir = config.failed_dir
1720 link_path = os.path.join(
1722 "%s-FAILED" % os.path.basename(self.current_test_case_info.tempdir),
1725 self.current_test_case_info.logger.debug(
1726 "creating a link to the failed test"
1728 self.current_test_case_info.logger.debug(
1729 "os.symlink(%s, %s)"
1730 % (self.current_test_case_info.tempdir, link_path)
1732 if os.path.exists(link_path):
1733 self.current_test_case_info.logger.debug("symlink already exists")
1735 os.symlink(self.current_test_case_info.tempdir, link_path)
1737 except Exception as e:
1738 self.current_test_case_info.logger.error(e)
1740 def send_result_through_pipe(self, test, result):
1741 if hasattr(self, "test_framework_result_pipe"):
1742 pipe = self.test_framework_result_pipe
1744 pipe.send((test.id(), result))
1746 def log_result(self, fn, test, err=None, reason=None):
1747 if self.current_test_case_info:
1748 if isinstance(test, unittest.suite._ErrorHolder):
1749 test_name = test.description
1751 test_name = "%s.%s(%s)" % (
1752 test.__class__.__name__,
1753 test._testMethodName,
1754 test._testMethodDoc,
1758 extra_msg += f", error is {err}"
1760 extra_msg += f", reason is {reason}"
1761 self.current_test_case_info.logger.debug(
1762 f"--- {fn}() {test_name} called{extra_msg}"
1765 self.current_test_case_info.logger.debug(
1766 "formatted exception is:\n%s" % "".join(format_exception(*err))
1769 def add_error(self, test, err, unittest_fn, result_code):
1770 self.result_code = result_code
1771 if result_code == TestResultCode.FAIL:
1772 self.log_result("addFailure", test, err=err)
1773 error_type_str = colorize("FAIL", RED)
1774 elif result_code == TestResultCode.ERROR:
1775 self.log_result("addError", test, err=err)
1776 error_type_str = colorize("ERROR", RED)
1778 raise Exception(f"Unexpected result code {result_code}")
1780 unittest_fn(self, test, err)
1781 if self.current_test_case_info:
1782 self.result_string = "%s [ temp dir used by test case: %s ]" % (
1784 self.current_test_case_info.tempdir,
1786 self.symlink_failed()
1787 self.failed_test_cases_info.add(self.current_test_case_info)
1788 if is_core_present(self.current_test_case_info.tempdir):
1789 if not self.current_test_case_info.core_crash_test:
1790 if isinstance(test, unittest.suite._ErrorHolder):
1791 test_name = str(test)
1793 test_name = "'{!s}' ({!s})".format(
1794 get_testcase_doc_name(test), test.id()
1796 self.current_test_case_info.core_crash_test = test_name
1797 self.core_crash_test_cases_info.add(self.current_test_case_info)
1799 self.result_string = "%s [no temp dir]" % error_type_str
1801 self.send_result_through_pipe(test, result_code)
1803 def addFailure(self, test, err):
1805 Record a test failed result
1808 :param err: error message
1811 self.add_error(test, err, unittest.TestResult.addFailure, TestResultCode.FAIL)
1813 def addError(self, test, err):
1815 Record a test error result
1818 :param err: error message
1821 self.add_error(test, err, unittest.TestResult.addError, TestResultCode.ERROR)
1823 def getDescription(self, test):
1825 Get test description
1828 :returns: test description
1831 return get_test_description(self.descriptions, test)
1833 def startTest(self, test):
1841 def print_header(test):
1842 if test.__class__ in self.printed:
1845 test_doc = getdoc(test)
1847 raise Exception("No doc string for test '%s'" % test.id())
1849 test_title = test_doc.splitlines()[0].rstrip()
1850 test_title = colorize(test_title, GREEN)
1851 if test.is_tagged_run_solo():
1852 test_title = colorize(f"SOLO RUN: {test_title}", YELLOW)
1854 # This block may overwrite the colorized title above,
1855 # but we want this to stand out and be fixed
1856 if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
1857 test_title = colorize(f"FIXME with VPP workers: {test_title}", RED)
1859 if test.has_tag(TestCaseTag.FIXME_ASAN):
1860 test_title = colorize(f"FIXME with ASAN: {test_title}", RED)
1861 test.skip_fixme_asan()
1863 if is_distro_ubuntu2204 == True and test.has_tag(
1864 TestCaseTag.FIXME_UBUNTU2204
1866 test_title = colorize(f"FIXME on Ubuntu-22.04: {test_title}", RED)
1867 test.skip_fixme_ubuntu2204()
1869 if is_distro_debian11 == True and test.has_tag(TestCaseTag.FIXME_DEBIAN11):
1870 test_title = colorize(f"FIXME on Debian-11: {test_title}", RED)
1871 test.skip_fixme_debian11()
1873 if "debug" in config.vpp_tag and test.has_tag(TestCaseTag.FIXME_VPP_DEBUG):
1874 test_title = colorize(f"FIXME on VPP Debug: {test_title}", RED)
1875 test.skip_fixme_vpp_debug()
1877 if hasattr(test, "vpp_worker_count"):
1878 if test.vpp_worker_count == 0:
1879 test_title += " [main thread only]"
1880 elif test.vpp_worker_count == 1:
1881 test_title += " [1 worker thread]"
1883 test_title += f" [{test.vpp_worker_count} worker threads]"
1885 if test.__class__.skipped_due_to_cpu_lack:
1886 test_title = colorize(
1887 f"{test_title} [skipped - not enough cpus, "
1888 f"required={test.__class__.get_cpus_required()}, "
1889 f"available={max_vpp_cpus}]",
1893 print(double_line_delim)
1895 print(double_line_delim)
1896 self.printed.append(test.__class__)
1899 self.start_test = time.time()
1900 unittest.TestResult.startTest(self, test)
1901 if self.verbosity > 0:
1902 self.stream.writeln("Starting " + self.getDescription(test) + " ...")
1903 self.stream.writeln(single_line_delim)
1905 def stopTest(self, test):
1907 Called when the given test has been run
1912 unittest.TestResult.stopTest(self, test)
1914 result_code_to_suffix = {
1915 TestResultCode.PASS: "",
1916 TestResultCode.FAIL: "",
1917 TestResultCode.ERROR: "",
1918 TestResultCode.SKIP: "",
1919 TestResultCode.TEST_RUN: "",
1920 TestResultCode.SKIP_CPU_SHORTAGE: "",
1921 TestResultCode.EXPECTED_FAIL: " [EXPECTED FAIL]",
1922 TestResultCode.UNEXPECTED_PASS: " [UNEXPECTED PASS]",
1925 if self.verbosity > 0:
1926 self.stream.writeln(single_line_delim)
1927 self.stream.writeln(
1930 self.getDescription(test),
1932 result_code_to_suffix[self.result_code],
1935 self.stream.writeln(single_line_delim)
1937 self.stream.writeln(
1940 self.getDescription(test),
1941 time.time() - self.start_test,
1943 result_code_to_suffix[self.result_code],
1947 self.send_result_through_pipe(test, TestResultCode.TEST_RUN)
1949 def printErrors(self):
1951 Print errors from running the test case
1953 if len(self.errors) > 0 or len(self.failures) > 0:
1954 self.stream.writeln()
1955 self.printErrorList("ERROR", self.errors)
1956 self.printErrorList("FAIL", self.failures)
1958 # ^^ that is the last output from unittest before summary
1959 if not self.runner.print_summary:
1960 devnull = unittest.runner._WritelnDecorator(open(os.devnull, "w"))
1961 self.stream = devnull
1962 self.runner.stream = devnull
1964 def printErrorList(self, flavour, errors):
1966 Print error list to the output stream together with error type
1967 and test case description.
1969 :param flavour: error type
1970 :param errors: iterable errors
1973 for test, err in errors:
1974 self.stream.writeln(double_line_delim)
1975 self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
1976 self.stream.writeln(single_line_delim)
1977 self.stream.writeln("%s" % err)
1980 class VppTestRunner(unittest.TextTestRunner):
1982 A basic test runner implementation which prints results to standard error.
1986 def resultclass(self):
1987 """Class maintaining the results of the tests"""
1988 return VppTestResult
1992 keep_alive_pipe=None,
2002 # ignore stream setting here, use hard-coded stdout to be in sync
2003 # with prints from VppTestCase methods ...
2004 super(VppTestRunner, self).__init__(
2005 sys.stdout, descriptions, verbosity, failfast, buffer, resultclass, **kwargs
2007 KeepAliveReporter.pipe = keep_alive_pipe
2009 self.orig_stream = self.stream
2010 self.resultclass.test_framework_result_pipe = result_pipe
2012 self.print_summary = print_summary
2014 def _makeResult(self):
2015 return self.resultclass(self.stream, self.descriptions, self.verbosity, self)
2017 def run(self, test):
2024 faulthandler.enable() # emit stack trace to stderr if killed by signal
2026 result = super(VppTestRunner, self).run(test)
2027 if not self.print_summary:
2028 self.stream = self.orig_stream
2029 result.stream = self.orig_stream
2033 class Worker(Thread):
2034 def __init__(self, executable_args, logger, env=None, *args, **kwargs):
2035 super(Worker, self).__init__(*args, **kwargs)
2036 self.logger = logger
2037 self.args = executable_args
2038 if hasattr(self, "testcase") and self.testcase.debug_all:
2039 if self.testcase.debug_gdbserver:
2041 "/usr/bin/gdbserver",
2042 "localhost:{port}".format(port=self.testcase.gdbserver_port),
2044 elif self.testcase.debug_gdb and hasattr(self, "wait_for_gdb"):
2045 self.args.append(self.wait_for_gdb)
2046 self.app_bin = executable_args[0]
2047 self.app_name = os.path.basename(self.app_bin)
2048 if hasattr(self, "role"):
2049 self.app_name += " {role}".format(role=self.role)
2052 env = {} if env is None else env
2053 self.env = copy.deepcopy(env)
2055 def wait_for_enter(self):
2056 if not hasattr(self, "testcase"):
2058 if self.testcase.debug_all and self.testcase.debug_gdbserver:
2060 print(double_line_delim)
2062 "Spawned GDB Server for '{app}' with PID: {pid}".format(
2063 app=self.app_name, pid=self.process.pid
2066 elif self.testcase.debug_all and self.testcase.debug_gdb:
2068 print(double_line_delim)
2070 "Spawned '{app}' with PID: {pid}".format(
2071 app=self.app_name, pid=self.process.pid
2076 print(single_line_delim)
2077 print("You can debug '{app}' using:".format(app=self.app_name))
2078 if self.testcase.debug_gdbserver:
2082 + " -ex 'target remote localhost:{port}'".format(
2083 port=self.testcase.gdbserver_port
2087 "Now is the time to attach gdb by running the above "
2088 "command, set up breakpoints etc., then resume from "
2089 "within gdb by issuing the 'continue' command"
2091 self.testcase.gdbserver_port += 1
2092 elif self.testcase.debug_gdb:
2096 + " -ex 'attach {pid}'".format(pid=self.process.pid)
2099 "Now is the time to attach gdb by running the above "
2100 "command and set up breakpoints etc., then resume from"
2101 " within gdb by issuing the 'continue' command"
2103 print(single_line_delim)
2104 input("Press ENTER to continue running the testcase...")
2107 executable = self.args[0]
2108 if not os.path.exists(executable) or not os.access(
2109 executable, os.F_OK | os.X_OK
2111 # Exit code that means some system file did not exist,
2112 # could not be opened, or had some other kind of error.
2113 self.result = os.EX_OSFILE
2114 raise EnvironmentError(
2115 "executable '%s' is not found or executable." % executable
2118 "Running executable '{app}': '{cmd}'".format(
2119 app=self.app_name, cmd=" ".join(self.args)
2122 env = os.environ.copy()
2123 env.update(self.env)
2124 env["CK_LOG_FILE_NAME"] = "-"
2125 self.process = subprocess.Popen(
2126 ["stdbuf", "-o0", "-e0"] + self.args,
2129 preexec_fn=os.setpgrp,
2130 stdout=subprocess.PIPE,
2131 stderr=subprocess.PIPE,
2133 self.wait_for_enter()
2134 out, err = self.process.communicate()
2135 self.logger.debug("Finished running `{app}'".format(app=self.app_name))
2136 self.logger.info("Return code is `%s'" % self.process.returncode)
2137 self.logger.info(single_line_delim)
2139 "Executable `{app}' wrote to stdout:".format(app=self.app_name)
2141 self.logger.info(single_line_delim)
2142 self.logger.info(out.decode("utf-8"))
2143 self.logger.info(single_line_delim)
2145 "Executable `{app}' wrote to stderr:".format(app=self.app_name)
2147 self.logger.info(single_line_delim)
2148 self.logger.info(err.decode("utf-8"))
2149 self.logger.info(single_line_delim)
2150 self.result = self.process.returncode
2153 if __name__ == "__main__":