3 from __future__ import print_function
18 from collections import deque
19 from threading import Thread, Event
20 from inspect import getdoc, isclass
21 from traceback import format_exception
22 from logging import FileHandler, DEBUG, Formatter
24 from abc import ABC, abstractmethod
25 from struct import pack, unpack
28 from scapy.packet import Raw, Packet
29 from config import config, available_cpus, num_cpus, max_vpp_cpus
30 import hook as hookmodule
31 from vpp_pg_interface import VppPGInterface
32 from vpp_sub_interface import VppSubInterface
33 from vpp_lo_interface import VppLoInterface
34 from vpp_bvi_interface import VppBviInterface
35 from vpp_papi_provider import VppPapiProvider
36 from vpp_papi import VppEnum
38 from vpp_papi.vpp_stats import VPPStats
39 from vpp_papi.vpp_transport_socket import VppTransportSocketIOError
49 from vpp_object import VppObjectRegistry
50 from util import ppp, is_core_present
51 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
52 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
53 from scapy.layers.inet6 import ICMPv6EchoReply
54 from vpp_running import use_running
57 logger = logging.getLogger(__name__)
59 # Set up an empty logger for the testcase that can be overridden as necessary
60 null_logger = logging.getLogger("VppTestCase")
61 null_logger.addHandler(logging.NullHandler())
71 if config.debug_framework:
75 Test framework module.
77 The module provides a set of tools for constructing and running tests and
78 representing the results.
82 class VppDiedError(Exception):
83 """exception for reporting that the subprocess has died."""
87 for k, v in signal.__dict__.items()
88 if k.startswith("SIG") and not k.startswith("SIG_")
91 def __init__(self, rv=None, testcase=None, method_name=None):
93 self.signal_name = None
94 self.testcase = testcase
95 self.method_name = method_name
98 self.signal_name = VppDiedError.signals_by_value[-rv]
99 except (KeyError, TypeError):
102 if testcase is None and method_name is None:
105 in_msg = " while running %s.%s" % (testcase, method_name)
108 msg = "VPP subprocess died unexpectedly%s with return code: %d%s." % (
111 " [%s]" % (self.signal_name if self.signal_name is not None else ""),
114 msg = "VPP subprocess died unexpectedly%s." % in_msg
116 super(VppDiedError, self).__init__(msg)
119 class _PacketInfo(object):
120 """Private class to create packet info object.
122 Help process information about the next packet.
123 Set variables to default values.
126 #: Store the index of the packet.
128 #: Store the index of the source packet generator interface of the packet.
130 #: Store the index of the destination packet generator interface
133 #: Store expected ip version
135 #: Store expected upper protocol
137 #: Store the copy of the former packet.
140 def __eq__(self, other):
141 index = self.index == other.index
142 src = self.src == other.src
143 dst = self.dst == other.dst
144 data = self.data == other.data
145 return index and src and dst and data
148 def pump_output(testclass):
149 """pump output from vpp stdout/stderr to proper queues"""
152 while not testclass.pump_thread_stop_flag.is_set():
153 readable = select.select(
155 testclass.vpp.stdout.fileno(),
156 testclass.vpp.stderr.fileno(),
157 testclass.pump_thread_wakeup_pipe[0],
162 if testclass.vpp.stdout.fileno() in readable:
163 read = os.read(testclass.vpp.stdout.fileno(), 102400)
165 split = read.decode("ascii", errors="backslashreplace").splitlines(True)
166 if len(stdout_fragment) > 0:
167 split[0] = "%s%s" % (stdout_fragment, split[0])
168 if len(split) > 0 and split[-1].endswith("\n"):
172 stdout_fragment = split[-1]
173 testclass.vpp_stdout_deque.extend(split[:limit])
174 if not config.cache_vpp_output:
175 for line in split[:limit]:
176 testclass.logger.info("VPP STDOUT: %s" % line.rstrip("\n"))
177 if testclass.vpp.stderr.fileno() in readable:
178 read = os.read(testclass.vpp.stderr.fileno(), 102400)
180 split = read.decode("ascii", errors="backslashreplace").splitlines(True)
181 if len(stderr_fragment) > 0:
182 split[0] = "%s%s" % (stderr_fragment, split[0])
183 if len(split) > 0 and split[-1].endswith("\n"):
187 stderr_fragment = split[-1]
189 testclass.vpp_stderr_deque.extend(split[:limit])
190 if not config.cache_vpp_output:
191 for line in split[:limit]:
192 testclass.logger.error("VPP STDERR: %s" % line.rstrip("\n"))
193 # ignoring the dummy pipe here intentionally - the
194 # flag will take care of properly terminating the loop
197 def _is_platform_aarch64():
198 return platform.machine() == "aarch64"
201 is_platform_aarch64 = _is_platform_aarch64()
204 def _is_distro_ubuntu2204():
205 with open("/etc/os-release") as f:
206 for line in f.readlines():
212 is_distro_ubuntu2204 = _is_distro_ubuntu2204()
215 class KeepAliveReporter(object):
217 Singleton object which reports test start to parent process
223 self.__dict__ = self._shared_state
231 def pipe(self, pipe):
232 if self._pipe is not None:
233 raise Exception("Internal error - pipe should only be set once.")
236 def send_keep_alive(self, test, desc=None):
238 Write current test tmpdir & desc to keep-alive pipe to signal liveness
240 if self.pipe is None:
241 # if not running forked..
245 desc = "%s (%s)" % (desc, unittest.util.strclass(test))
249 self.pipe.send((desc, config.vpp, test.tempdir, test.vpp.pid))
252 class TestCaseTag(Enum):
253 # marks the suites that must run at the end
254 # using only a single test runner
256 # marks the suites broken on VPP multi-worker
257 FIXME_VPP_WORKERS = 2
258 # marks the suites broken when ASan is enabled
260 # marks suites broken on Ubuntu-22.04
264 def create_tag_decorator(e):
267 cls.test_tags.append(e)
268 except AttributeError:
275 tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
276 tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
277 tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN)
278 tag_fixme_ubuntu2204 = create_tag_decorator(TestCaseTag.FIXME_UBUNTU2204)
292 class CPUInterface(ABC):
294 skipped_due_to_cpu_lack = False
298 def get_cpus_required(cls):
302 def assign_cpus(cls, cpus):
307 class VppTestCase(CPUInterface, unittest.TestCase):
308 """This subclass is a base class for VPP test cases that are implemented as
309 classes. It provides methods to create and run test case.
312 extra_vpp_statseg_config = ""
313 extra_vpp_punt_config = []
314 extra_vpp_plugin_config = []
316 vapi_response_timeout = 5
317 remove_configured_vpp_objects_on_tear_down = True
320 def packet_infos(self):
321 """List of packet infos"""
322 return self._packet_infos
325 def get_packet_count_for_if_idx(cls, dst_if_index):
326 """Get the number of packet info for specified destination if index"""
327 if dst_if_index in cls._packet_count_for_dst_if_idx:
328 return cls._packet_count_for_dst_if_idx[dst_if_index]
333 def has_tag(cls, tag):
334 """if the test case has a given tag - return true"""
336 return tag in cls.test_tags
337 except AttributeError:
342 def is_tagged_run_solo(cls):
343 """if the test case class is timing-sensitive - return true"""
344 return cls.has_tag(TestCaseTag.RUN_SOLO)
347 def skip_fixme_asan(cls):
348 """if @tag_fixme_asan & ASan is enabled - mark for skip"""
349 if cls.has_tag(TestCaseTag.FIXME_ASAN):
350 vpp_extra_cmake_args = os.environ.get("VPP_EXTRA_CMAKE_ARGS", "")
351 if "DVPP_ENABLE_SANITIZE_ADDR=ON" in vpp_extra_cmake_args:
352 cls = unittest.skip("Skipping @tag_fixme_asan tests")(cls)
355 def skip_fixme_ubuntu2204(cls):
356 """if distro is ubuntu 22.04 and @tag_fixme_ubuntu2204 mark for skip"""
357 if cls.has_tag(TestCaseTag.FIXME_UBUNTU2204):
358 cls = unittest.skip("Skipping @tag_fixme_ubuntu2204 tests")(cls)
362 """Return the instance of this testcase"""
363 return cls.test_instance
366 def set_debug_flags(cls, d):
367 cls.gdbserver_port = 7777
368 cls.debug_core = False
369 cls.debug_gdb = False
370 cls.debug_gdbserver = False
371 cls.debug_all = False
372 cls.debug_attach = False
377 cls.debug_core = True
378 elif dl == "gdb" or dl == "gdb-all":
380 elif dl == "gdbserver" or dl == "gdbserver-all":
381 cls.debug_gdbserver = True
383 cls.debug_attach = True
385 raise Exception("Unrecognized DEBUG option: '%s'" % d)
386 if dl == "gdb-all" or dl == "gdbserver-all":
390 def get_vpp_worker_count(cls):
391 if not hasattr(cls, "vpp_worker_count"):
392 if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
393 cls.vpp_worker_count = 0
395 cls.vpp_worker_count = config.vpp_worker_count
396 return cls.vpp_worker_count
399 def get_cpus_required(cls):
400 return 1 + cls.get_vpp_worker_count()
403 def setUpConstants(cls):
404 """Set-up the test case class based on environment variables"""
405 cls.step = config.step
406 cls.plugin_path = ":".join(config.vpp_plugin_dir)
407 cls.test_plugin_path = ":".join(config.vpp_test_plugin_dir)
408 cls.extern_plugin_path = ":".join(config.extern_plugin_dir)
410 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
411 debug_cli = "cli-listen localhost:5002"
412 size = re.search(r"\d+[gG]", config.coredump_size)
414 coredump_size = f"coredump-size {config.coredump_size}".lower()
416 coredump_size = "coredump-size unlimited"
417 default_variant = config.variant
418 if default_variant is not None:
419 default_variant = "default { variant %s 100 }" % default_variant
423 api_fuzzing = config.api_fuzz
424 if api_fuzzing is None:
445 cls.get_api_segment_prefix(),
452 if cls.extern_plugin_path not in (None, ""):
453 cls.extra_vpp_plugin_config.append("add-path %s" % cls.extern_plugin_path)
454 if cls.get_vpp_worker_count():
455 cls.vpp_cmdline.extend(
456 ["corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])]
458 cls.vpp_cmdline.extend(
469 cls.get_stats_sock_path(),
470 cls.extra_vpp_statseg_config,
475 cls.get_api_sock_path(),
496 "lisp_unittest_plugin.so",
501 "unittest_plugin.so",
506 + cls.extra_vpp_plugin_config
512 if cls.extra_vpp_punt_config is not None:
513 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
515 if not cls.debug_attach:
516 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
517 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
520 def wait_for_enter(cls):
521 if cls.debug_gdbserver:
522 print(double_line_delim)
523 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
525 print(double_line_delim)
526 print("Spawned VPP with PID: %d" % cls.vpp.pid)
528 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
530 print(single_line_delim)
531 print("You can debug VPP using:")
532 if cls.debug_gdbserver:
534 f"sudo gdb {config.vpp} "
535 f"-ex 'target remote localhost:{cls.gdbserver_port}'"
538 "Now is the time to attach gdb by running the above "
539 "command, set up breakpoints etc., then resume VPP from "
540 "within gdb by issuing the 'continue' command"
542 cls.gdbserver_port += 1
544 print(f"sudo gdb {config.vpp} -ex 'attach {cls.vpp.pid}'")
546 "Now is the time to attach gdb by running the above "
547 "command and set up breakpoints etc., then resume VPP from"
548 " within gdb by issuing the 'continue' command"
550 print(single_line_delim)
551 input("Press ENTER to continue running the testcase...")
559 cls.logger.debug(f"Assigned cpus: {cls.cpus}")
560 cmdline = cls.vpp_cmdline
562 if cls.debug_gdbserver:
563 gdbserver = "/usr/bin/gdbserver"
564 if not os.path.isfile(gdbserver) or not os.access(gdbserver, os.X_OK):
566 "gdbserver binary '%s' does not exist or is "
567 "not executable" % gdbserver
572 "localhost:{port}".format(port=cls.gdbserver_port),
574 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
577 cls.vpp = subprocess.Popen(
578 cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE
580 except subprocess.CalledProcessError as e:
582 "Subprocess returned with non-0 return code: (%s)", e.returncode
587 "Subprocess returned with OS error: (%s) %s", e.errno, e.strerror
590 except Exception as e:
591 cls.logger.exception("Subprocess returned unexpected from %s:", cmdline)
597 def wait_for_coredump(cls):
598 corefile = cls.tempdir + "/core"
599 if os.path.isfile(corefile):
600 cls.logger.error("Waiting for coredump to complete: %s", corefile)
601 curr_size = os.path.getsize(corefile)
602 deadline = time.time() + 60
604 while time.time() < deadline:
607 curr_size = os.path.getsize(corefile)
608 if size == curr_size:
613 "Timed out waiting for coredump to complete: %s", corefile
616 cls.logger.error("Coredump complete: %s, size %d", corefile, curr_size)
619 def get_stats_sock_path(cls):
620 return "%s/stats.sock" % cls.tempdir
623 def get_api_sock_path(cls):
624 return "%s/api.sock" % cls.tempdir
627 def get_api_segment_prefix(cls):
628 return os.path.basename(cls.tempdir) # Only used for VAPI
631 def get_tempdir(cls):
633 tmpdir = f"{config.tmp_dir}/unittest-attach-gdb"
635 tmpdir = f"{config.tmp_dir}/vpp-unittest-{cls.__name__}"
636 if config.wipe_tmp_dir:
637 shutil.rmtree(tmpdir, ignore_errors=True)
642 def create_file_handler(cls):
643 if config.log_dir is None:
644 cls.file_handler = FileHandler(f"{cls.tempdir}/log.txt")
647 logdir = f"{config.log_dir}/vpp-unittest-{cls.__name__}"
648 if config.wipe_tmp_dir:
649 shutil.rmtree(logdir, ignore_errors=True)
651 cls.file_handler = FileHandler(f"{logdir}/log.txt")
656 Perform class setup before running the testcase
657 Remove shared memory files, start vpp and connect the vpp-api
659 super(VppTestCase, cls).setUpClass()
660 cls.logger = get_logger(cls.__name__)
661 random.seed(config.rnd_seed)
662 if hasattr(cls, "parallel_handler"):
663 cls.logger.addHandler(cls.parallel_handler)
664 cls.logger.propagate = False
665 cls.set_debug_flags(config.debug)
666 cls.tempdir = cls.get_tempdir()
667 cls.create_file_handler()
668 cls.file_handler.setFormatter(
669 Formatter(fmt="%(asctime)s,%(msecs)03d %(message)s", datefmt="%H:%M:%S")
671 cls.file_handler.setLevel(DEBUG)
672 cls.logger.addHandler(cls.file_handler)
673 cls.logger.debug("--- setUpClass() for %s called ---" % cls.__name__)
674 os.chdir(cls.tempdir)
676 "Temporary dir is %s, api socket is %s",
678 cls.get_api_sock_path(),
680 cls.logger.debug("Random seed is %s", config.rnd_seed)
682 cls.reset_packet_infos()
687 cls.registry = VppObjectRegistry()
688 cls.vpp_startup_failed = False
689 cls.reporter = KeepAliveReporter()
690 # need to catch exceptions here because if we raise, then the cleanup
691 # doesn't get called and we might end with a zombie vpp
697 cls.reporter.send_keep_alive(cls, "setUpClass")
698 VppTestResult.current_test_case_info = TestCaseInfo(
699 cls.logger, cls.tempdir, cls.vpp.pid, config.vpp
701 cls.vpp_stdout_deque = deque()
702 cls.vpp_stderr_deque = deque()
703 # Pump thread in a non-debug-attached & not running-vpp
704 if not cls.debug_attach and not hasattr(cls, "running_vpp"):
705 cls.pump_thread_stop_flag = Event()
706 cls.pump_thread_wakeup_pipe = os.pipe()
707 cls.pump_thread = Thread(target=pump_output, args=(cls,))
708 cls.pump_thread.daemon = True
709 cls.pump_thread.start()
710 if cls.debug_gdb or cls.debug_gdbserver or cls.debug_attach:
711 cls.vapi_response_timeout = 0
712 cls.vapi = VppPapiProvider(cls.__name__, cls, cls.vapi_response_timeout)
714 hook = hookmodule.StepHook(cls)
716 hook = hookmodule.PollHook(cls)
717 cls.vapi.register_hook(hook)
718 cls.statistics = VPPStats(socketname=cls.get_stats_sock_path())
722 cls.vpp_startup_failed = True
724 "VPP died shortly after startup, check the"
725 " output to standard error for possible cause"
730 except (vpp_papi.VPPIOError, Exception) as e:
731 cls.logger.debug("Exception connecting to vapi: %s" % e)
732 cls.vapi.disconnect()
734 if cls.debug_gdbserver:
737 "You're running VPP inside gdbserver but "
738 "VPP-API connection failed, did you forget "
739 "to 'continue' VPP from within gdb?",
745 last_line = cls.vapi.cli("show thread").split("\n")[-2]
746 cls.vpp_worker_count = int(last_line.split(" ")[0])
747 print("Detected VPP with %s workers." % cls.vpp_worker_count)
748 except vpp_papi.VPPRuntimeError as e:
749 cls.logger.debug("%s" % e)
752 except Exception as e:
753 cls.logger.debug("Exception connecting to VPP: %s" % e)
758 def _debug_quit(cls):
759 if cls.debug_gdbserver or cls.debug_gdb:
763 if cls.vpp.returncode is None:
765 print(double_line_delim)
766 print("VPP or GDB server is still running")
767 print(single_line_delim)
769 "When done debugging, press ENTER to kill the "
770 "process and finish running the testcase..."
772 except AttributeError:
778 Disconnect vpp-api, kill vpp and cleanup shared memory files
781 if hasattr(cls, "running_vpp"):
784 # first signal that we want to stop the pump thread, then wake it up
785 if hasattr(cls, "pump_thread_stop_flag"):
786 cls.pump_thread_stop_flag.set()
787 if hasattr(cls, "pump_thread_wakeup_pipe"):
788 os.write(cls.pump_thread_wakeup_pipe[1], b"ding dong wake up")
789 if hasattr(cls, "pump_thread"):
790 cls.logger.debug("Waiting for pump thread to stop")
791 cls.pump_thread.join()
792 if hasattr(cls, "vpp_stderr_reader_thread"):
793 cls.logger.debug("Waiting for stderr pump to stop")
794 cls.vpp_stderr_reader_thread.join()
796 if hasattr(cls, "vpp"):
797 if hasattr(cls, "vapi"):
798 cls.logger.debug(cls.vapi.vpp.get_stats())
799 cls.logger.debug("Disconnecting class vapi client on %s", cls.__name__)
800 cls.vapi.disconnect()
801 cls.logger.debug("Deleting class vapi attribute on %s", cls.__name__)
804 if not cls.debug_attach and cls.vpp.returncode is None:
805 cls.wait_for_coredump()
806 cls.logger.debug("Sending TERM to vpp")
808 cls.logger.debug("Waiting for vpp to die")
810 outs, errs = cls.vpp.communicate(timeout=5)
811 except subprocess.TimeoutExpired:
813 outs, errs = cls.vpp.communicate()
814 cls.logger.debug("Deleting class vpp attribute on %s", cls.__name__)
815 if not cls.debug_attach and not hasattr(cls, "running_vpp"):
816 cls.vpp.stdout.close()
817 cls.vpp.stderr.close()
818 # If vpp is a dynamic attribute set by the func use_running,
819 # deletion will result in an AttributeError that we can
823 except AttributeError:
826 if cls.vpp_startup_failed:
827 stdout_log = cls.logger.info
828 stderr_log = cls.logger.critical
830 stdout_log = cls.logger.info
831 stderr_log = cls.logger.info
833 if hasattr(cls, "vpp_stdout_deque"):
834 stdout_log(single_line_delim)
835 stdout_log("VPP output to stdout while running %s:", cls.__name__)
836 stdout_log(single_line_delim)
837 vpp_output = "".join(cls.vpp_stdout_deque)
838 with open(cls.tempdir + "/vpp_stdout.txt", "w") as f:
840 stdout_log("\n%s", vpp_output)
841 stdout_log(single_line_delim)
843 if hasattr(cls, "vpp_stderr_deque"):
844 stderr_log(single_line_delim)
845 stderr_log("VPP output to stderr while running %s:", cls.__name__)
846 stderr_log(single_line_delim)
847 vpp_output = "".join(cls.vpp_stderr_deque)
848 with open(cls.tempdir + "/vpp_stderr.txt", "w") as f:
850 stderr_log("\n%s", vpp_output)
851 stderr_log(single_line_delim)
854 def tearDownClass(cls):
855 """Perform final cleanup after running all tests in this test-case"""
856 cls.logger.debug("--- tearDownClass() for %s called ---" % cls.__name__)
857 cls.reporter.send_keep_alive(cls, "tearDownClass")
859 cls.file_handler.close()
860 cls.reset_packet_infos()
861 if config.debug_framework:
862 debug_internal.on_tear_down_class(cls)
864 def show_commands_at_teardown(self):
865 """Allow subclass specific teardown logging additions."""
866 self.logger.info("--- No test specific show commands provided. ---")
869 """Show various debug prints after each test"""
871 "--- tearDown() for %s.%s(%s) called ---"
872 % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
876 if not self.vpp_dead:
877 self.logger.debug(self.vapi.cli("show trace max 1000"))
878 self.logger.info(self.vapi.ppcli("show interface"))
879 self.logger.info(self.vapi.ppcli("show hardware"))
880 self.logger.info(self.statistics.set_errors_str())
881 self.logger.info(self.vapi.ppcli("show run"))
882 self.logger.info(self.vapi.ppcli("show log"))
883 self.logger.info(self.vapi.ppcli("show bihash"))
884 self.logger.info("Logging testcase specific show commands.")
885 self.show_commands_at_teardown()
886 if self.remove_configured_vpp_objects_on_tear_down:
887 self.registry.remove_vpp_config(self.logger)
888 # Save/Dump VPP api trace log
889 m = self._testMethodName
890 api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
891 tmp_api_trace = "/tmp/%s" % api_trace
892 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
893 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
894 self.logger.info("Moving %s to %s\n" % (tmp_api_trace, vpp_api_trace_log))
895 os.rename(tmp_api_trace, vpp_api_trace_log)
896 except VppTransportSocketIOError:
898 "VppTransportSocketIOError: Vpp dead. Cannot log show commands."
902 self.registry.unregister_all(self.logger)
905 """Clear trace before running each test"""
906 super(VppTestCase, self).setUp()
907 self.reporter.send_keep_alive(self)
911 testcase=self.__class__.__name__,
912 method_name=self._testMethodName,
914 self.sleep(0.1, "during setUp")
915 self.vpp_stdout_deque.append(
916 "--- test setUp() for %s.%s(%s) starts here ---\n"
917 % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
919 self.vpp_stderr_deque.append(
920 "--- test setUp() for %s.%s(%s) starts here ---\n"
921 % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
923 self.vapi.cli("clear trace")
924 # store the test instance inside the test class - so that objects
925 # holding the class can access instance methods (like assertEqual)
926 type(self).test_instance = self
929 def pg_enable_capture(cls, interfaces=None):
931 Enable capture on packet-generator interfaces
933 :param interfaces: iterable interface indexes (if None,
934 use self.pg_interfaces)
937 if interfaces is None:
938 interfaces = cls.pg_interfaces
943 def register_pcap(cls, intf, worker):
944 """Register a pcap in the testclass"""
945 # add to the list of captures with current timestamp
946 cls._pcaps.append((intf, worker))
949 def get_vpp_time(cls):
950 # processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT"
951 # returns float("2.190522")
952 timestr = cls.vapi.cli("show clock")
953 head, sep, tail = timestr.partition(",")
954 head, sep, tail = head.partition("Time now")
958 def sleep_on_vpp_time(cls, sec):
959 """Sleep according to time in VPP world"""
960 # On a busy system with many processes
961 # we might end up with VPP time being slower than real world
962 # So take that into account when waiting for VPP to do something
963 start_time = cls.get_vpp_time()
964 while cls.get_vpp_time() - start_time < sec:
968 def pg_start(cls, trace=True):
969 """Enable the PG, wait till it is done, then clean up"""
970 for (intf, worker) in cls._old_pcaps:
971 intf.handle_old_pcap_file(intf.get_in_path(worker), intf.in_history_counter)
974 cls.vapi.cli("clear trace")
975 cls.vapi.cli("trace add pg-input 1000")
976 cls.vapi.cli("packet-generator enable")
977 # PG, when starts, runs to completion -
978 # so let's avoid a race condition,
979 # and wait a little till it's done.
980 # Then clean it up - and then be gone.
981 deadline = time.time() + 300
982 while cls.vapi.cli("show packet-generator").find("Yes") != -1:
983 cls.sleep(0.01) # yield
984 if time.time() > deadline:
985 cls.logger.error("Timeout waiting for pg to stop")
987 for intf, worker in cls._pcaps:
988 cls.vapi.cli("packet-generator delete %s" % intf.get_cap_name(worker))
989 cls._old_pcaps = cls._pcaps
993 def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0, mode=None):
995 Create packet-generator interfaces.
997 :param interfaces: iterable indexes of the interfaces.
998 :returns: List of created interfaces.
1002 for i in interfaces:
1003 intf = VppPGInterface(cls, i, gso, gso_size, mode)
1004 setattr(cls, intf.name, intf)
1006 cls.pg_interfaces = result
1010 def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
1011 pgmode = VppEnum.vl_api_pg_interface_mode_t
1012 return cls.create_pg_interfaces_internal(
1013 interfaces, gso, gso_size, pgmode.PG_API_MODE_IP4
1017 def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
1018 pgmode = VppEnum.vl_api_pg_interface_mode_t
1019 return cls.create_pg_interfaces_internal(
1020 interfaces, gso, gso_size, pgmode.PG_API_MODE_IP6
1024 def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
1025 pgmode = VppEnum.vl_api_pg_interface_mode_t
1026 return cls.create_pg_interfaces_internal(
1027 interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
1031 def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
1032 pgmode = VppEnum.vl_api_pg_interface_mode_t
1033 return cls.create_pg_interfaces_internal(
1034 interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
1038 def create_loopback_interfaces(cls, count):
1040 Create loopback interfaces.
1042 :param count: number of interfaces created.
1043 :returns: List of created interfaces.
1045 result = [VppLoInterface(cls) for i in range(count)]
1047 setattr(cls, intf.name, intf)
1048 cls.lo_interfaces = result
1052 def create_bvi_interfaces(cls, count):
1054 Create BVI interfaces.
1056 :param count: number of interfaces created.
1057 :returns: List of created interfaces.
1059 result = [VppBviInterface(cls) for i in range(count)]
1061 setattr(cls, intf.name, intf)
1062 cls.bvi_interfaces = result
1066 def extend_packet(packet, size, padding=" "):
1068 Extend packet to given size by padding with spaces or custom padding
1069 NOTE: Currently works only when Raw layer is present.
1071 :param packet: packet
1072 :param size: target size
1073 :param padding: padding used to extend the payload
1076 packet_len = len(packet) + 4
1077 extend = size - packet_len
1079 num = (extend // len(padding)) + 1
1080 packet[Raw].load += (padding * num)[:extend].encode("ascii")
1083 def reset_packet_infos(cls):
1084 """Reset the list of packet info objects and packet counts to zero"""
1085 cls._packet_infos = {}
1086 cls._packet_count_for_dst_if_idx = {}
1089 def create_packet_info(cls, src_if, dst_if):
1091 Create packet info object containing the source and destination indexes
1092 and add it to the testcase's packet info list
1094 :param VppInterface src_if: source interface
1095 :param VppInterface dst_if: destination interface
1097 :returns: _PacketInfo object
1100 info = _PacketInfo()
1101 info.index = len(cls._packet_infos)
1102 info.src = src_if.sw_if_index
1103 info.dst = dst_if.sw_if_index
1104 if isinstance(dst_if, VppSubInterface):
1105 dst_idx = dst_if.parent.sw_if_index
1107 dst_idx = dst_if.sw_if_index
1108 if dst_idx in cls._packet_count_for_dst_if_idx:
1109 cls._packet_count_for_dst_if_idx[dst_idx] += 1
1111 cls._packet_count_for_dst_if_idx[dst_idx] = 1
1112 cls._packet_infos[info.index] = info
1116 def info_to_payload(info):
1118 Convert _PacketInfo object to packet payload
1120 :param info: _PacketInfo object
1122 :returns: string containing serialized data from packet info
1125 # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1126 return pack("iiiih", info.index, info.src, info.dst, info.ip, info.proto)
1129 def payload_to_info(payload, payload_field="load"):
1131 Convert packet payload to _PacketInfo object
1133 :param payload: packet payload
1134 :type payload: <class 'scapy.packet.Raw'>
1135 :param payload_field: packet fieldname of payload "load" for
1136 <class 'scapy.packet.Raw'>
1137 :type payload_field: str
1138 :returns: _PacketInfo object containing de-serialized data from payload
1142 # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1143 payload_b = getattr(payload, payload_field)[:18]
1145 info = _PacketInfo()
1146 info.index, info.src, info.dst, info.ip, info.proto = unpack("iiiih", payload_b)
1148 # some SRv6 TCs depend on get an exception if bad values are detected
1149 if info.index > 0x4000:
1150 raise ValueError("Index value is invalid")
1154 def get_next_packet_info(self, info):
1156 Iterate over the packet info list stored in the testcase
1157 Start iteration with first element if info is None
1158 Continue based on index in info if info is specified
1160 :param info: info or None
1161 :returns: next info in list or None if no more infos
1166 next_index = info.index + 1
1167 if next_index == len(self._packet_infos):
1170 return self._packet_infos[next_index]
1172 def get_next_packet_info_for_interface(self, src_index, info):
1174 Search the packet info list for the next packet info with same source
1177 :param src_index: source interface index to search for
1178 :param info: packet info - where to start the search
1179 :returns: packet info or None
1183 info = self.get_next_packet_info(info)
1186 if info.src == src_index:
1189 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
1191 Search the packet info list for the next packet info with same source
1192 and destination interface indexes
1194 :param src_index: source interface index to search for
1195 :param dst_index: destination interface index to search for
1196 :param info: packet info - where to start the search
1197 :returns: packet info or None
1201 info = self.get_next_packet_info_for_interface(src_index, info)
1204 if info.dst == dst_index:
1207 def assert_equal(self, real_value, expected_value, name_or_class=None):
1208 if name_or_class is None:
1209 self.assertEqual(real_value, expected_value)
1212 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
1214 getdoc(name_or_class).strip(),
1216 str(name_or_class(real_value)),
1218 str(name_or_class(expected_value)),
1221 msg = "Invalid %s: %s does not match expected value %s" % (
1227 self.assertEqual(real_value, expected_value, msg)
1229 def assert_in_range(self, real_value, expected_min, expected_max, name=None):
1233 msg = "Invalid %s: %s out of range <%s,%s>" % (
1239 self.assertTrue(expected_min <= real_value <= expected_max, msg)
1241 def assert_packet_checksums_valid(self, packet, ignore_zero_udp_checksums=True):
1242 received = packet.__class__(scapy.compat.raw(packet))
1243 udp_layers = ["UDP", "UDPerror"]
1244 checksum_fields = ["cksum", "chksum"]
1247 temp = received.__class__(scapy.compat.raw(received))
1249 layer = temp.getlayer(counter)
1251 layer = layer.copy()
1252 layer.remove_payload()
1253 for cf in checksum_fields:
1254 if hasattr(layer, cf):
1256 ignore_zero_udp_checksums
1257 and 0 == getattr(layer, cf)
1258 and layer.name in udp_layers
1261 delattr(temp.getlayer(counter), cf)
1262 checksums.append((counter, cf))
1265 counter = counter + 1
1266 if 0 == len(checksums):
1268 temp = temp.__class__(scapy.compat.raw(temp))
1269 for layer, cf in checksums:
1270 calc_sum = getattr(temp[layer], cf)
1272 getattr(received[layer], cf),
1274 "packet checksum on layer #%d: %s" % (layer, temp[layer].name),
1277 "Checksum field `%s` on `%s` layer has correct value `%s`"
1278 % (cf, temp[layer].name, calc_sum)
1281 def assert_checksum_valid(
1282 self, received_packet, layer, field_name="chksum", ignore_zero_checksum=False
1284 """Check checksum of received packet on given layer"""
1285 received_packet_checksum = getattr(received_packet[layer], field_name)
1286 if ignore_zero_checksum and 0 == received_packet_checksum:
1288 recalculated = received_packet.__class__(scapy.compat.raw(received_packet))
1289 delattr(recalculated[layer], field_name)
1290 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1292 received_packet_checksum,
1293 getattr(recalculated[layer], field_name),
1294 "packet checksum on layer: %s" % layer,
1297 def assert_ip_checksum_valid(self, received_packet, ignore_zero_checksum=False):
1298 self.assert_checksum_valid(
1299 received_packet, "IP", ignore_zero_checksum=ignore_zero_checksum
1302 def assert_tcp_checksum_valid(self, received_packet, ignore_zero_checksum=False):
1303 self.assert_checksum_valid(
1304 received_packet, "TCP", ignore_zero_checksum=ignore_zero_checksum
1307 def assert_udp_checksum_valid(self, received_packet, ignore_zero_checksum=True):
1308 self.assert_checksum_valid(
1309 received_packet, "UDP", ignore_zero_checksum=ignore_zero_checksum
1312 def assert_embedded_icmp_checksum_valid(self, received_packet):
1313 if received_packet.haslayer(IPerror):
1314 self.assert_checksum_valid(received_packet, "IPerror")
1315 if received_packet.haslayer(TCPerror):
1316 self.assert_checksum_valid(received_packet, "TCPerror")
1317 if received_packet.haslayer(UDPerror):
1318 self.assert_checksum_valid(
1319 received_packet, "UDPerror", ignore_zero_checksum=True
1321 if received_packet.haslayer(ICMPerror):
1322 self.assert_checksum_valid(received_packet, "ICMPerror")
1324 def assert_icmp_checksum_valid(self, received_packet):
1325 self.assert_checksum_valid(received_packet, "ICMP")
1326 self.assert_embedded_icmp_checksum_valid(received_packet)
1328 def assert_icmpv6_checksum_valid(self, pkt):
1329 if pkt.haslayer(ICMPv6DestUnreach):
1330 self.assert_checksum_valid(pkt, "ICMPv6DestUnreach", "cksum")
1331 self.assert_embedded_icmp_checksum_valid(pkt)
1332 if pkt.haslayer(ICMPv6EchoRequest):
1333 self.assert_checksum_valid(pkt, "ICMPv6EchoRequest", "cksum")
1334 if pkt.haslayer(ICMPv6EchoReply):
1335 self.assert_checksum_valid(pkt, "ICMPv6EchoReply", "cksum")
1337 def get_counter(self, counter):
1338 if counter.startswith("/"):
1339 counter_value = self.statistics.get_counter(counter)
1341 counters = self.vapi.cli("sh errors").split("\n")
1343 for i in range(1, len(counters) - 1):
1344 results = counters[i].split()
1345 if results[1] == counter:
1346 counter_value = int(results[0])
1348 return counter_value
1350 def assert_counter_equal(self, counter, expected_value, thread=None, index=0):
1351 c = self.get_counter(counter)
1352 if thread is not None:
1353 c = c[thread][index]
1355 c = sum(x[index] for x in c)
1356 self.assert_equal(c, expected_value, "counter `%s'" % counter)
1358 def assert_packet_counter_equal(self, counter, expected_value):
1359 counter_value = self.get_counter(counter)
1361 counter_value, expected_value, "packet counter `%s'" % counter
1364 def assert_error_counter_equal(self, counter, expected_value):
1365 counter_value = self.statistics[counter].sum()
1366 self.assert_equal(counter_value, expected_value, "error counter `%s'" % counter)
1369 def sleep(cls, timeout, remark=None):
1371 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1372 # * by Guido, only the main thread can be interrupted.
1374 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1377 if hasattr(os, "sched_yield"):
1383 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1384 before = time.time()
1387 if after - before > 2 * timeout:
1389 "unexpected self.sleep() result - slept for %es instead of ~%es!",
1395 "Finished sleep (%s) - slept %es (wanted %es)",
1401 def virtual_sleep(self, timeout, remark=None):
1402 self.logger.debug("Moving VPP time by %s (%s)", timeout, remark)
1403 self.vapi.cli("set clock adjust %s" % timeout)
1405 def pg_send(self, intf, pkts, worker=None, trace=True):
1406 intf.add_stream(pkts, worker=worker)
1407 self.pg_enable_capture(self.pg_interfaces)
1408 self.pg_start(trace=trace)
1410 def snapshot_stats(self, stats_diff):
1411 """Return snapshot of interesting stats based on diff dictionary."""
1413 for sw_if_index in stats_diff:
1414 for counter in stats_diff[sw_if_index]:
1415 stats_snapshot[counter] = self.statistics[counter]
1416 self.logger.debug(f"Took statistics stats_snapshot: {stats_snapshot}")
1417 return stats_snapshot
1419 def compare_stats_with_snapshot(self, stats_diff, stats_snapshot):
1420 """Assert appropriate difference between current stats and snapshot."""
1421 for sw_if_index in stats_diff:
1422 for cntr, diff in stats_diff[sw_if_index].items():
1423 if sw_if_index == "err":
1425 self.statistics[cntr].sum(),
1426 stats_snapshot[cntr].sum() + diff,
1427 f"'{cntr}' counter value (previous value: "
1428 f"{stats_snapshot[cntr].sum()}, "
1429 f"expected diff: {diff})",
1434 self.statistics[cntr][:, sw_if_index].sum(),
1435 stats_snapshot[cntr][:, sw_if_index].sum() + diff,
1436 f"'{cntr}' counter value (previous value: "
1437 f"{stats_snapshot[cntr][:, sw_if_index].sum()}, "
1438 f"expected diff: {diff})",
1441 # if diff is 0, then this most probably a case where
1442 # test declares multiple interfaces but traffic hasn't
1443 # passed through this one yet - which means the counter
1444 # value is 0 and can be ignored
1448 def send_and_assert_no_replies(
1449 self, intf, pkts, remark="", timeout=None, stats_diff=None, trace=True, msg=None
1452 stats_snapshot = self.snapshot_stats(stats_diff)
1454 self.pg_send(intf, pkts)
1459 for i in self.pg_interfaces:
1460 i.assert_nothing_captured(timeout=timeout, remark=remark)
1465 self.logger.debug(f"send_and_assert_no_replies: {msg}")
1466 self.logger.debug(self.vapi.cli("show trace"))
1469 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1471 def send_and_expect(
1483 stats_snapshot = self.snapshot_stats(stats_diff)
1486 n_rx = 1 if isinstance(pkts, Packet) else len(pkts)
1487 self.pg_send(intf, pkts, worker=worker, trace=trace)
1488 rx = output.get_capture(n_rx)
1491 self.logger.debug(f"send_and_expect: {msg}")
1492 self.logger.debug(self.vapi.cli("show trace"))
1495 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1499 def send_and_expect_load_balancing(
1500 self, input, pkts, outputs, worker=None, trace=True
1502 self.pg_send(input, pkts, worker=worker, trace=trace)
1505 rx = oo._get_capture(1)
1506 self.assertNotEqual(0, len(rx))
1509 self.logger.debug(self.vapi.cli("show trace"))
1512 def send_and_expect_some(self, intf, pkts, output, worker=None, trace=True):
1513 self.pg_send(intf, pkts, worker=worker, trace=trace)
1514 rx = output._get_capture(1)
1516 self.logger.debug(self.vapi.cli("show trace"))
1517 self.assertTrue(len(rx) > 0)
1518 self.assertTrue(len(rx) < len(pkts))
1521 def send_and_expect_only(self, intf, pkts, output, timeout=None, stats_diff=None):
1523 stats_snapshot = self.snapshot_stats(stats_diff)
1525 self.pg_send(intf, pkts)
1526 rx = output.get_capture(len(pkts))
1530 for i in self.pg_interfaces:
1531 if i not in outputs:
1532 i.assert_nothing_captured(timeout=timeout)
1536 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1541 def get_testcase_doc_name(test):
1542 return getdoc(test.__class__).splitlines()[0]
1545 def get_test_description(descriptions, test):
1546 short_description = test.shortDescription()
1547 if descriptions and short_description:
1548 return short_description
1553 class TestCaseInfo(object):
1554 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1555 self.logger = logger
1556 self.tempdir = tempdir
1557 self.vpp_pid = vpp_pid
1558 self.vpp_bin_path = vpp_bin_path
1559 self.core_crash_test = None
1562 class VppTestResult(unittest.TestResult):
1564 @property result_string
1565 String variable to store the test case result string.
1567 List variable containing 2-tuples of TestCase instances and strings
1568 holding formatted tracebacks. Each tuple represents a test which
1569 raised an unexpected exception.
1571 List variable containing 2-tuples of TestCase instances and strings
1572 holding formatted tracebacks. Each tuple represents a test where
1573 a failure was explicitly signalled using the TestCase.assert*()
1577 failed_test_cases_info = set()
1578 core_crash_test_cases_info = set()
1579 current_test_case_info = None
1581 def __init__(self, stream=None, descriptions=None, verbosity=None, runner=None):
1583 :param stream File descriptor to store where to report test results.
1584 Set to the standard error stream by default.
1585 :param descriptions Boolean variable to store information if to use
1586 test case descriptions.
1587 :param verbosity Integer variable to store required verbosity level.
1589 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1590 self.stream = stream
1591 self.descriptions = descriptions
1592 self.verbosity = verbosity
1593 self.result_string = None
1594 self.runner = runner
1597 def addSuccess(self, test):
1599 Record a test succeeded result
1604 if self.current_test_case_info:
1605 self.current_test_case_info.logger.debug(
1606 "--- addSuccess() %s.%s(%s) called"
1607 % (test.__class__.__name__, test._testMethodName, test._testMethodDoc)
1609 unittest.TestResult.addSuccess(self, test)
1610 self.result_string = colorize("OK", GREEN)
1612 self.send_result_through_pipe(test, PASS)
1614 def addSkip(self, test, reason):
1616 Record a test skipped.
1622 if self.current_test_case_info:
1623 self.current_test_case_info.logger.debug(
1624 "--- addSkip() %s.%s(%s) called, reason is %s"
1626 test.__class__.__name__,
1627 test._testMethodName,
1628 test._testMethodDoc,
1632 unittest.TestResult.addSkip(self, test, reason)
1633 self.result_string = colorize("SKIP", YELLOW)
1635 if reason == "not enough cpus":
1636 self.send_result_through_pipe(test, SKIP_CPU_SHORTAGE)
1638 self.send_result_through_pipe(test, SKIP)
1640 def symlink_failed(self):
1641 if self.current_test_case_info:
1643 failed_dir = config.failed_dir
1644 link_path = os.path.join(
1646 "%s-FAILED" % os.path.basename(self.current_test_case_info.tempdir),
1649 self.current_test_case_info.logger.debug(
1650 "creating a link to the failed test"
1652 self.current_test_case_info.logger.debug(
1653 "os.symlink(%s, %s)"
1654 % (self.current_test_case_info.tempdir, link_path)
1656 if os.path.exists(link_path):
1657 self.current_test_case_info.logger.debug("symlink already exists")
1659 os.symlink(self.current_test_case_info.tempdir, link_path)
1661 except Exception as e:
1662 self.current_test_case_info.logger.error(e)
1664 def send_result_through_pipe(self, test, result):
1665 if hasattr(self, "test_framework_result_pipe"):
1666 pipe = self.test_framework_result_pipe
1668 pipe.send((test.id(), result))
1670 def log_error(self, test, err, fn_name):
1671 if self.current_test_case_info:
1672 if isinstance(test, unittest.suite._ErrorHolder):
1673 test_name = test.description
1675 test_name = "%s.%s(%s)" % (
1676 test.__class__.__name__,
1677 test._testMethodName,
1678 test._testMethodDoc,
1680 self.current_test_case_info.logger.debug(
1681 "--- %s() %s called, err is %s" % (fn_name, test_name, err)
1683 self.current_test_case_info.logger.debug(
1684 "formatted exception is:\n%s" % "".join(format_exception(*err))
1687 def add_error(self, test, err, unittest_fn, error_type):
1688 if error_type == FAIL:
1689 self.log_error(test, err, "addFailure")
1690 error_type_str = colorize("FAIL", RED)
1691 elif error_type == ERROR:
1692 self.log_error(test, err, "addError")
1693 error_type_str = colorize("ERROR", RED)
1696 "Error type %s cannot be used to record an "
1697 "error or a failure" % error_type
1700 unittest_fn(self, test, err)
1701 if self.current_test_case_info:
1702 self.result_string = "%s [ temp dir used by test case: %s ]" % (
1704 self.current_test_case_info.tempdir,
1706 self.symlink_failed()
1707 self.failed_test_cases_info.add(self.current_test_case_info)
1708 if is_core_present(self.current_test_case_info.tempdir):
1709 if not self.current_test_case_info.core_crash_test:
1710 if isinstance(test, unittest.suite._ErrorHolder):
1711 test_name = str(test)
1713 test_name = "'{!s}' ({!s})".format(
1714 get_testcase_doc_name(test), test.id()
1716 self.current_test_case_info.core_crash_test = test_name
1717 self.core_crash_test_cases_info.add(self.current_test_case_info)
1719 self.result_string = "%s [no temp dir]" % error_type_str
1721 self.send_result_through_pipe(test, error_type)
1723 def addFailure(self, test, err):
1725 Record a test failed result
1728 :param err: error message
1731 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1733 def addError(self, test, err):
1735 Record a test error result
1738 :param err: error message
1741 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1743 def getDescription(self, test):
1745 Get test description
1748 :returns: test description
1751 return get_test_description(self.descriptions, test)
1753 def startTest(self, test):
1761 def print_header(test):
1762 if test.__class__ in self.printed:
1765 test_doc = getdoc(test)
1767 raise Exception("No doc string for test '%s'" % test.id())
1769 test_title = test_doc.splitlines()[0].rstrip()
1770 test_title = colorize(test_title, GREEN)
1771 if test.is_tagged_run_solo():
1772 test_title = colorize(f"SOLO RUN: {test_title}", YELLOW)
1774 # This block may overwrite the colorized title above,
1775 # but we want this to stand out and be fixed
1776 if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
1777 test_title = colorize(f"FIXME with VPP workers: {test_title}", RED)
1779 if test.has_tag(TestCaseTag.FIXME_ASAN):
1780 test_title = colorize(f"FIXME with ASAN: {test_title}", RED)
1781 test.skip_fixme_asan()
1783 if is_distro_ubuntu2204 == True and test.has_tag(
1784 TestCaseTag.FIXME_UBUNTU2204
1786 test_title = colorize(f"FIXME on Ubuntu-22.04: {test_title}", RED)
1787 test.skip_fixme_ubuntu2204()
1789 if hasattr(test, "vpp_worker_count"):
1790 if test.vpp_worker_count == 0:
1791 test_title += " [main thread only]"
1792 elif test.vpp_worker_count == 1:
1793 test_title += " [1 worker thread]"
1795 test_title += f" [{test.vpp_worker_count} worker threads]"
1797 if test.__class__.skipped_due_to_cpu_lack:
1798 test_title = colorize(
1799 f"{test_title} [skipped - not enough cpus, "
1800 f"required={test.__class__.get_cpus_required()}, "
1801 f"available={max_vpp_cpus}]",
1805 print(double_line_delim)
1807 print(double_line_delim)
1808 self.printed.append(test.__class__)
1811 self.start_test = time.time()
1812 unittest.TestResult.startTest(self, test)
1813 if self.verbosity > 0:
1814 self.stream.writeln("Starting " + self.getDescription(test) + " ...")
1815 self.stream.writeln(single_line_delim)
1817 def stopTest(self, test):
1819 Called when the given test has been run
1824 unittest.TestResult.stopTest(self, test)
1826 if self.verbosity > 0:
1827 self.stream.writeln(single_line_delim)
1828 self.stream.writeln(
1829 "%-73s%s" % (self.getDescription(test), self.result_string)
1831 self.stream.writeln(single_line_delim)
1833 self.stream.writeln(
1836 self.getDescription(test),
1837 time.time() - self.start_test,
1842 self.send_result_through_pipe(test, TEST_RUN)
1844 def printErrors(self):
1846 Print errors from running the test case
1848 if len(self.errors) > 0 or len(self.failures) > 0:
1849 self.stream.writeln()
1850 self.printErrorList("ERROR", self.errors)
1851 self.printErrorList("FAIL", self.failures)
1853 # ^^ that is the last output from unittest before summary
1854 if not self.runner.print_summary:
1855 devnull = unittest.runner._WritelnDecorator(open(os.devnull, "w"))
1856 self.stream = devnull
1857 self.runner.stream = devnull
1859 def printErrorList(self, flavour, errors):
1861 Print error list to the output stream together with error type
1862 and test case description.
1864 :param flavour: error type
1865 :param errors: iterable errors
1868 for test, err in errors:
1869 self.stream.writeln(double_line_delim)
1870 self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
1871 self.stream.writeln(single_line_delim)
1872 self.stream.writeln("%s" % err)
1875 class VppTestRunner(unittest.TextTestRunner):
1877 A basic test runner implementation which prints results to standard error.
1881 def resultclass(self):
1882 """Class maintaining the results of the tests"""
1883 return VppTestResult
1887 keep_alive_pipe=None,
1897 # ignore stream setting here, use hard-coded stdout to be in sync
1898 # with prints from VppTestCase methods ...
1899 super(VppTestRunner, self).__init__(
1900 sys.stdout, descriptions, verbosity, failfast, buffer, resultclass, **kwargs
1902 KeepAliveReporter.pipe = keep_alive_pipe
1904 self.orig_stream = self.stream
1905 self.resultclass.test_framework_result_pipe = result_pipe
1907 self.print_summary = print_summary
1909 def _makeResult(self):
1910 return self.resultclass(self.stream, self.descriptions, self.verbosity, self)
1912 def run(self, test):
1919 faulthandler.enable() # emit stack trace to stderr if killed by signal
1921 result = super(VppTestRunner, self).run(test)
1922 if not self.print_summary:
1923 self.stream = self.orig_stream
1924 result.stream = self.orig_stream
1928 class Worker(Thread):
1929 def __init__(self, executable_args, logger, env=None, *args, **kwargs):
1930 super(Worker, self).__init__(*args, **kwargs)
1931 self.logger = logger
1932 self.args = executable_args
1933 if hasattr(self, "testcase") and self.testcase.debug_all:
1934 if self.testcase.debug_gdbserver:
1936 "/usr/bin/gdbserver",
1937 "localhost:{port}".format(port=self.testcase.gdbserver_port),
1939 elif self.testcase.debug_gdb and hasattr(self, "wait_for_gdb"):
1940 self.args.append(self.wait_for_gdb)
1941 self.app_bin = executable_args[0]
1942 self.app_name = os.path.basename(self.app_bin)
1943 if hasattr(self, "role"):
1944 self.app_name += " {role}".format(role=self.role)
1947 env = {} if env is None else env
1948 self.env = copy.deepcopy(env)
1950 def wait_for_enter(self):
1951 if not hasattr(self, "testcase"):
1953 if self.testcase.debug_all and self.testcase.debug_gdbserver:
1955 print(double_line_delim)
1957 "Spawned GDB Server for '{app}' with PID: {pid}".format(
1958 app=self.app_name, pid=self.process.pid
1961 elif self.testcase.debug_all and self.testcase.debug_gdb:
1963 print(double_line_delim)
1965 "Spawned '{app}' with PID: {pid}".format(
1966 app=self.app_name, pid=self.process.pid
1971 print(single_line_delim)
1972 print("You can debug '{app}' using:".format(app=self.app_name))
1973 if self.testcase.debug_gdbserver:
1977 + " -ex 'target remote localhost:{port}'".format(
1978 port=self.testcase.gdbserver_port
1982 "Now is the time to attach gdb by running the above "
1983 "command, set up breakpoints etc., then resume from "
1984 "within gdb by issuing the 'continue' command"
1986 self.testcase.gdbserver_port += 1
1987 elif self.testcase.debug_gdb:
1991 + " -ex 'attach {pid}'".format(pid=self.process.pid)
1994 "Now is the time to attach gdb by running the above "
1995 "command and set up breakpoints etc., then resume from"
1996 " within gdb by issuing the 'continue' command"
1998 print(single_line_delim)
1999 input("Press ENTER to continue running the testcase...")
2002 executable = self.args[0]
2003 if not os.path.exists(executable) or not os.access(
2004 executable, os.F_OK | os.X_OK
2006 # Exit code that means some system file did not exist,
2007 # could not be opened, or had some other kind of error.
2008 self.result = os.EX_OSFILE
2009 raise EnvironmentError(
2010 "executable '%s' is not found or executable." % executable
2013 "Running executable '{app}': '{cmd}'".format(
2014 app=self.app_name, cmd=" ".join(self.args)
2017 env = os.environ.copy()
2018 env.update(self.env)
2019 env["CK_LOG_FILE_NAME"] = "-"
2020 self.process = subprocess.Popen(
2021 ["stdbuf", "-o0", "-e0"] + self.args,
2024 preexec_fn=os.setpgrp,
2025 stdout=subprocess.PIPE,
2026 stderr=subprocess.PIPE,
2028 self.wait_for_enter()
2029 out, err = self.process.communicate()
2030 self.logger.debug("Finished running `{app}'".format(app=self.app_name))
2031 self.logger.info("Return code is `%s'" % self.process.returncode)
2032 self.logger.info(single_line_delim)
2034 "Executable `{app}' wrote to stdout:".format(app=self.app_name)
2036 self.logger.info(single_line_delim)
2037 self.logger.info(out.decode("utf-8"))
2038 self.logger.info(single_line_delim)
2040 "Executable `{app}' wrote to stderr:".format(app=self.app_name)
2042 self.logger.info(single_line_delim)
2043 self.logger.info(err.decode("utf-8"))
2044 self.logger.info(single_line_delim)
2045 self.result = self.process.returncode
2048 if __name__ == "__main__":