3 from __future__ import print_function
18 from collections import deque
19 from threading import Thread, Event
20 from inspect import getdoc, isclass
21 from traceback import format_exception
22 from logging import FileHandler, DEBUG, Formatter
24 from abc import ABC, abstractmethod
25 from struct import pack, unpack
28 from scapy.packet import Raw, Packet
29 from config import config, available_cpus, num_cpus, max_vpp_cpus
30 import hook as hookmodule
31 from vpp_pg_interface import VppPGInterface
32 from vpp_sub_interface import VppSubInterface
33 from vpp_lo_interface import VppLoInterface
34 from vpp_bvi_interface import VppBviInterface
35 from vpp_papi_provider import VppPapiProvider
36 from vpp_papi import VppEnum
38 from vpp_papi.vpp_stats import VPPStats
39 from vpp_papi.vpp_transport_socket import VppTransportSocketIOError
49 from vpp_object import VppObjectRegistry
50 from util import ppp, is_core_present
51 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
52 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
53 from scapy.layers.inet6 import ICMPv6EchoReply
56 logger = logging.getLogger(__name__)
58 # Set up an empty logger for the testcase that can be overridden as necessary
59 null_logger = logging.getLogger("VppTestCase")
60 null_logger.addHandler(logging.NullHandler())
70 if config.debug_framework:
74 Test framework module.
76 The module provides a set of tools for constructing and running tests and
77 representing the results.
81 class VppDiedError(Exception):
82 """exception for reporting that the subprocess has died."""
86 for k, v in signal.__dict__.items()
87 if k.startswith("SIG") and not k.startswith("SIG_")
90 def __init__(self, rv=None, testcase=None, method_name=None):
92 self.signal_name = None
93 self.testcase = testcase
94 self.method_name = method_name
97 self.signal_name = VppDiedError.signals_by_value[-rv]
98 except (KeyError, TypeError):
101 if testcase is None and method_name is None:
104 in_msg = " while running %s.%s" % (testcase, method_name)
107 msg = "VPP subprocess died unexpectedly%s with return code: %d%s." % (
110 " [%s]" % (self.signal_name if self.signal_name is not None else ""),
113 msg = "VPP subprocess died unexpectedly%s." % in_msg
115 super(VppDiedError, self).__init__(msg)
118 class _PacketInfo(object):
119 """Private class to create packet info object.
121 Help process information about the next packet.
122 Set variables to default values.
125 #: Store the index of the packet.
127 #: Store the index of the source packet generator interface of the packet.
129 #: Store the index of the destination packet generator interface
132 #: Store expected ip version
134 #: Store expected upper protocol
136 #: Store the copy of the former packet.
139 def __eq__(self, other):
140 index = self.index == other.index
141 src = self.src == other.src
142 dst = self.dst == other.dst
143 data = self.data == other.data
144 return index and src and dst and data
147 def pump_output(testclass):
148 """pump output from vpp stdout/stderr to proper queues"""
151 while not testclass.pump_thread_stop_flag.is_set():
152 readable = select.select(
154 testclass.vpp.stdout.fileno(),
155 testclass.vpp.stderr.fileno(),
156 testclass.pump_thread_wakeup_pipe[0],
161 if testclass.vpp.stdout.fileno() in readable:
162 read = os.read(testclass.vpp.stdout.fileno(), 102400)
164 split = read.decode("ascii", errors="backslashreplace").splitlines(True)
165 if len(stdout_fragment) > 0:
166 split[0] = "%s%s" % (stdout_fragment, split[0])
167 if len(split) > 0 and split[-1].endswith("\n"):
171 stdout_fragment = split[-1]
172 testclass.vpp_stdout_deque.extend(split[:limit])
173 if not config.cache_vpp_output:
174 for line in split[:limit]:
175 testclass.logger.info("VPP STDOUT: %s" % line.rstrip("\n"))
176 if testclass.vpp.stderr.fileno() in readable:
177 read = os.read(testclass.vpp.stderr.fileno(), 102400)
179 split = read.decode("ascii", errors="backslashreplace").splitlines(True)
180 if len(stderr_fragment) > 0:
181 split[0] = "%s%s" % (stderr_fragment, split[0])
182 if len(split) > 0 and split[-1].endswith("\n"):
186 stderr_fragment = split[-1]
188 testclass.vpp_stderr_deque.extend(split[:limit])
189 if not config.cache_vpp_output:
190 for line in split[:limit]:
191 testclass.logger.error("VPP STDERR: %s" % line.rstrip("\n"))
192 # ignoring the dummy pipe here intentionally - the
193 # flag will take care of properly terminating the loop
196 def _is_platform_aarch64():
197 return platform.machine() == "aarch64"
200 is_platform_aarch64 = _is_platform_aarch64()
203 def _is_distro_ubuntu2204():
204 with open("/etc/os-release") as f:
205 for line in f.readlines():
211 is_distro_ubuntu2204 = _is_distro_ubuntu2204()
214 class KeepAliveReporter(object):
216 Singleton object which reports test start to parent process
222 self.__dict__ = self._shared_state
230 def pipe(self, pipe):
231 if self._pipe is not None:
232 raise Exception("Internal error - pipe should only be set once.")
235 def send_keep_alive(self, test, desc=None):
237 Write current test tmpdir & desc to keep-alive pipe to signal liveness
239 if self.pipe is None:
240 # if not running forked..
244 desc = "%s (%s)" % (desc, unittest.util.strclass(test))
248 self.pipe.send((desc, config.vpp, test.tempdir, test.vpp.pid))
251 class TestCaseTag(Enum):
252 # marks the suites that must run at the end
253 # using only a single test runner
255 # marks the suites broken on VPP multi-worker
256 FIXME_VPP_WORKERS = 2
257 # marks the suites broken when ASan is enabled
259 # marks suites broken on Ubuntu-22.04
263 def create_tag_decorator(e):
266 cls.test_tags.append(e)
267 except AttributeError:
274 tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
275 tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
276 tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN)
277 tag_fixme_ubuntu2204 = create_tag_decorator(TestCaseTag.FIXME_UBUNTU2204)
291 class CPUInterface(ABC):
293 skipped_due_to_cpu_lack = False
297 def get_cpus_required(cls):
301 def assign_cpus(cls, cpus):
305 class VppTestCase(CPUInterface, unittest.TestCase):
306 """This subclass is a base class for VPP test cases that are implemented as
307 classes. It provides methods to create and run test case.
310 extra_vpp_statseg_config = ""
311 extra_vpp_punt_config = []
312 extra_vpp_plugin_config = []
314 vapi_response_timeout = 5
315 remove_configured_vpp_objects_on_tear_down = True
318 def packet_infos(self):
319 """List of packet infos"""
320 return self._packet_infos
323 def get_packet_count_for_if_idx(cls, dst_if_index):
324 """Get the number of packet info for specified destination if index"""
325 if dst_if_index in cls._packet_count_for_dst_if_idx:
326 return cls._packet_count_for_dst_if_idx[dst_if_index]
331 def has_tag(cls, tag):
332 """if the test case has a given tag - return true"""
334 return tag in cls.test_tags
335 except AttributeError:
340 def is_tagged_run_solo(cls):
341 """if the test case class is timing-sensitive - return true"""
342 return cls.has_tag(TestCaseTag.RUN_SOLO)
345 def skip_fixme_asan(cls):
346 """if @tag_fixme_asan & ASan is enabled - mark for skip"""
347 if cls.has_tag(TestCaseTag.FIXME_ASAN):
348 vpp_extra_cmake_args = os.environ.get("VPP_EXTRA_CMAKE_ARGS", "")
349 if "DVPP_ENABLE_SANITIZE_ADDR=ON" in vpp_extra_cmake_args:
350 cls = unittest.skip("Skipping @tag_fixme_asan tests")(cls)
353 def skip_fixme_ubuntu2204(cls):
354 """if distro is ubuntu 22.04 and @tag_fixme_ubuntu2204 mark for skip"""
355 if cls.has_tag(TestCaseTag.FIXME_UBUNTU2204):
356 cls = unittest.skip("Skipping @tag_fixme_ubuntu2204 tests")(cls)
360 """Return the instance of this testcase"""
361 return cls.test_instance
364 def set_debug_flags(cls, d):
365 cls.gdbserver_port = 7777
366 cls.debug_core = False
367 cls.debug_gdb = False
368 cls.debug_gdbserver = False
369 cls.debug_all = False
370 cls.debug_attach = False
375 cls.debug_core = True
376 elif dl == "gdb" or dl == "gdb-all":
378 elif dl == "gdbserver" or dl == "gdbserver-all":
379 cls.debug_gdbserver = True
381 cls.debug_attach = True
383 raise Exception("Unrecognized DEBUG option: '%s'" % d)
384 if dl == "gdb-all" or dl == "gdbserver-all":
388 def get_vpp_worker_count(cls):
389 if not hasattr(cls, "vpp_worker_count"):
390 if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
391 cls.vpp_worker_count = 0
393 cls.vpp_worker_count = config.vpp_worker_count
394 return cls.vpp_worker_count
397 def get_cpus_required(cls):
398 return 1 + cls.get_vpp_worker_count()
401 def setUpConstants(cls):
402 """Set-up the test case class based on environment variables"""
403 cls.step = config.step
404 cls.plugin_path = ":".join(config.vpp_plugin_dir)
405 cls.test_plugin_path = ":".join(config.vpp_test_plugin_dir)
406 cls.extern_plugin_path = ":".join(config.extern_plugin_dir)
408 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
409 debug_cli = "cli-listen localhost:5002"
410 size = re.search(r"\d+[gG]", config.coredump_size)
412 coredump_size = f"coredump-size {config.coredump_size}".lower()
414 coredump_size = "coredump-size unlimited"
415 default_variant = config.variant
416 if default_variant is not None:
417 default_variant = "default { variant %s 100 }" % default_variant
421 api_fuzzing = config.api_fuzz
422 if api_fuzzing is None:
443 cls.get_api_segment_prefix(),
450 if cls.extern_plugin_path not in (None, ""):
451 cls.extra_vpp_plugin_config.append("add-path %s" % cls.extern_plugin_path)
452 if cls.get_vpp_worker_count():
453 cls.vpp_cmdline.extend(
454 ["corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])]
456 cls.vpp_cmdline.extend(
467 cls.get_stats_sock_path(),
468 cls.extra_vpp_statseg_config,
473 cls.get_api_sock_path(),
494 "lisp_unittest_plugin.so",
499 "unittest_plugin.so",
504 + cls.extra_vpp_plugin_config
510 if cls.extra_vpp_punt_config is not None:
511 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
513 if not cls.debug_attach:
514 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
515 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
518 def wait_for_enter(cls):
519 if cls.debug_gdbserver:
520 print(double_line_delim)
521 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
523 print(double_line_delim)
524 print("Spawned VPP with PID: %d" % cls.vpp.pid)
526 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
528 print(single_line_delim)
529 print("You can debug VPP using:")
530 if cls.debug_gdbserver:
532 f"sudo gdb {config.vpp} "
533 f"-ex 'target remote localhost:{cls.gdbserver_port}'"
536 "Now is the time to attach gdb by running the above "
537 "command, set up breakpoints etc., then resume VPP from "
538 "within gdb by issuing the 'continue' command"
540 cls.gdbserver_port += 1
542 print(f"sudo gdb {config.vpp} -ex 'attach {cls.vpp.pid}'")
544 "Now is the time to attach gdb by running the above "
545 "command and set up breakpoints etc., then resume VPP from"
546 " within gdb by issuing the 'continue' command"
548 print(single_line_delim)
549 input("Press ENTER to continue running the testcase...")
557 cls.logger.debug(f"Assigned cpus: {cls.cpus}")
558 cmdline = cls.vpp_cmdline
560 if cls.debug_gdbserver:
561 gdbserver = "/usr/bin/gdbserver"
562 if not os.path.isfile(gdbserver) or not os.access(gdbserver, os.X_OK):
564 "gdbserver binary '%s' does not exist or is "
565 "not executable" % gdbserver
570 "localhost:{port}".format(port=cls.gdbserver_port),
572 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
575 cls.vpp = subprocess.Popen(
576 cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE
578 except subprocess.CalledProcessError as e:
580 "Subprocess returned with non-0 return code: (%s)", e.returncode
585 "Subprocess returned with OS error: (%s) %s", e.errno, e.strerror
588 except Exception as e:
589 cls.logger.exception("Subprocess returned unexpected from %s:", cmdline)
595 def wait_for_coredump(cls):
596 corefile = cls.tempdir + "/core"
597 if os.path.isfile(corefile):
598 cls.logger.error("Waiting for coredump to complete: %s", corefile)
599 curr_size = os.path.getsize(corefile)
600 deadline = time.time() + 60
602 while time.time() < deadline:
605 curr_size = os.path.getsize(corefile)
606 if size == curr_size:
611 "Timed out waiting for coredump to complete: %s", corefile
614 cls.logger.error("Coredump complete: %s, size %d", corefile, curr_size)
617 def get_stats_sock_path(cls):
618 return "%s/stats.sock" % cls.tempdir
621 def get_api_sock_path(cls):
622 return "%s/api.sock" % cls.tempdir
625 def get_api_segment_prefix(cls):
626 return os.path.basename(cls.tempdir) # Only used for VAPI
629 def get_tempdir(cls):
631 tmpdir = f"{config.tmp_dir}/unittest-attach-gdb"
633 tmpdir = f"{config.tmp_dir}/vpp-unittest-{cls.__name__}"
634 if config.wipe_tmp_dir:
635 shutil.rmtree(tmpdir, ignore_errors=True)
640 def create_file_handler(cls):
641 if config.log_dir is None:
642 cls.file_handler = FileHandler(f"{cls.tempdir}/log.txt")
645 logdir = f"{config.log_dir}/vpp-unittest-{cls.__name__}"
646 if config.wipe_tmp_dir:
647 shutil.rmtree(logdir, ignore_errors=True)
649 cls.file_handler = FileHandler(f"{logdir}/log.txt")
654 Perform class setup before running the testcase
655 Remove shared memory files, start vpp and connect the vpp-api
657 super(VppTestCase, cls).setUpClass()
658 cls.logger = get_logger(cls.__name__)
659 random.seed(config.rnd_seed)
660 if hasattr(cls, "parallel_handler"):
661 cls.logger.addHandler(cls.parallel_handler)
662 cls.logger.propagate = False
663 cls.set_debug_flags(config.debug)
664 cls.tempdir = cls.get_tempdir()
665 cls.create_file_handler()
666 cls.file_handler.setFormatter(
667 Formatter(fmt="%(asctime)s,%(msecs)03d %(message)s", datefmt="%H:%M:%S")
669 cls.file_handler.setLevel(DEBUG)
670 cls.logger.addHandler(cls.file_handler)
671 cls.logger.debug("--- setUpClass() for %s called ---" % cls.__name__)
672 os.chdir(cls.tempdir)
674 "Temporary dir is %s, api socket is %s",
676 cls.get_api_sock_path(),
678 cls.logger.debug("Random seed is %s", config.rnd_seed)
680 cls.reset_packet_infos()
685 cls.registry = VppObjectRegistry()
686 cls.vpp_startup_failed = False
687 cls.reporter = KeepAliveReporter()
688 # need to catch exceptions here because if we raise, then the cleanup
689 # doesn't get called and we might end with a zombie vpp
695 cls.reporter.send_keep_alive(cls, "setUpClass")
696 VppTestResult.current_test_case_info = TestCaseInfo(
697 cls.logger, cls.tempdir, cls.vpp.pid, config.vpp
699 cls.vpp_stdout_deque = deque()
700 cls.vpp_stderr_deque = deque()
701 if not cls.debug_attach:
702 cls.pump_thread_stop_flag = Event()
703 cls.pump_thread_wakeup_pipe = os.pipe()
704 cls.pump_thread = Thread(target=pump_output, args=(cls,))
705 cls.pump_thread.daemon = True
706 cls.pump_thread.start()
707 if cls.debug_gdb or cls.debug_gdbserver or cls.debug_attach:
708 cls.vapi_response_timeout = 0
709 cls.vapi = VppPapiProvider(cls.__name__, cls, cls.vapi_response_timeout)
711 hook = hookmodule.StepHook(cls)
713 hook = hookmodule.PollHook(cls)
714 cls.vapi.register_hook(hook)
715 cls.statistics = VPPStats(socketname=cls.get_stats_sock_path())
719 cls.vpp_startup_failed = True
721 "VPP died shortly after startup, check the"
722 " output to standard error for possible cause"
727 except (vpp_papi.VPPIOError, Exception) as e:
728 cls.logger.debug("Exception connecting to vapi: %s" % e)
729 cls.vapi.disconnect()
731 if cls.debug_gdbserver:
734 "You're running VPP inside gdbserver but "
735 "VPP-API connection failed, did you forget "
736 "to 'continue' VPP from within gdb?",
742 last_line = cls.vapi.cli("show thread").split("\n")[-2]
743 cls.vpp_worker_count = int(last_line.split(" ")[0])
744 print("Detected VPP with %s workers." % cls.vpp_worker_count)
745 except vpp_papi.VPPRuntimeError as e:
746 cls.logger.debug("%s" % e)
749 except Exception as e:
750 cls.logger.debug("Exception connecting to VPP: %s" % e)
755 def _debug_quit(cls):
756 if cls.debug_gdbserver or cls.debug_gdb:
760 if cls.vpp.returncode is None:
762 print(double_line_delim)
763 print("VPP or GDB server is still running")
764 print(single_line_delim)
766 "When done debugging, press ENTER to kill the "
767 "process and finish running the testcase..."
769 except AttributeError:
775 Disconnect vpp-api, kill vpp and cleanup shared memory files
779 # first signal that we want to stop the pump thread, then wake it up
780 if hasattr(cls, "pump_thread_stop_flag"):
781 cls.pump_thread_stop_flag.set()
782 if hasattr(cls, "pump_thread_wakeup_pipe"):
783 os.write(cls.pump_thread_wakeup_pipe[1], b"ding dong wake up")
784 if hasattr(cls, "pump_thread"):
785 cls.logger.debug("Waiting for pump thread to stop")
786 cls.pump_thread.join()
787 if hasattr(cls, "vpp_stderr_reader_thread"):
788 cls.logger.debug("Waiting for stderr pump to stop")
789 cls.vpp_stderr_reader_thread.join()
791 if hasattr(cls, "vpp"):
792 if hasattr(cls, "vapi"):
793 cls.logger.debug(cls.vapi.vpp.get_stats())
794 cls.logger.debug("Disconnecting class vapi client on %s", cls.__name__)
795 cls.vapi.disconnect()
796 cls.logger.debug("Deleting class vapi attribute on %s", cls.__name__)
799 if not cls.debug_attach and cls.vpp.returncode is None:
800 cls.wait_for_coredump()
801 cls.logger.debug("Sending TERM to vpp")
803 cls.logger.debug("Waiting for vpp to die")
805 outs, errs = cls.vpp.communicate(timeout=5)
806 except subprocess.TimeoutExpired:
808 outs, errs = cls.vpp.communicate()
809 cls.logger.debug("Deleting class vpp attribute on %s", cls.__name__)
810 if not cls.debug_attach:
811 cls.vpp.stdout.close()
812 cls.vpp.stderr.close()
815 if cls.vpp_startup_failed:
816 stdout_log = cls.logger.info
817 stderr_log = cls.logger.critical
819 stdout_log = cls.logger.info
820 stderr_log = cls.logger.info
822 if hasattr(cls, "vpp_stdout_deque"):
823 stdout_log(single_line_delim)
824 stdout_log("VPP output to stdout while running %s:", cls.__name__)
825 stdout_log(single_line_delim)
826 vpp_output = "".join(cls.vpp_stdout_deque)
827 with open(cls.tempdir + "/vpp_stdout.txt", "w") as f:
829 stdout_log("\n%s", vpp_output)
830 stdout_log(single_line_delim)
832 if hasattr(cls, "vpp_stderr_deque"):
833 stderr_log(single_line_delim)
834 stderr_log("VPP output to stderr while running %s:", cls.__name__)
835 stderr_log(single_line_delim)
836 vpp_output = "".join(cls.vpp_stderr_deque)
837 with open(cls.tempdir + "/vpp_stderr.txt", "w") as f:
839 stderr_log("\n%s", vpp_output)
840 stderr_log(single_line_delim)
843 def tearDownClass(cls):
844 """Perform final cleanup after running all tests in this test-case"""
845 cls.logger.debug("--- tearDownClass() for %s called ---" % cls.__name__)
846 cls.reporter.send_keep_alive(cls, "tearDownClass")
848 cls.file_handler.close()
849 cls.reset_packet_infos()
850 if config.debug_framework:
851 debug_internal.on_tear_down_class(cls)
853 def show_commands_at_teardown(self):
854 """Allow subclass specific teardown logging additions."""
855 self.logger.info("--- No test specific show commands provided. ---")
858 """Show various debug prints after each test"""
860 "--- tearDown() for %s.%s(%s) called ---"
861 % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
865 if not self.vpp_dead:
866 self.logger.debug(self.vapi.cli("show trace max 1000"))
867 self.logger.info(self.vapi.ppcli("show interface"))
868 self.logger.info(self.vapi.ppcli("show hardware"))
869 self.logger.info(self.statistics.set_errors_str())
870 self.logger.info(self.vapi.ppcli("show run"))
871 self.logger.info(self.vapi.ppcli("show log"))
872 self.logger.info(self.vapi.ppcli("show bihash"))
873 self.logger.info("Logging testcase specific show commands.")
874 self.show_commands_at_teardown()
875 if self.remove_configured_vpp_objects_on_tear_down:
876 self.registry.remove_vpp_config(self.logger)
877 # Save/Dump VPP api trace log
878 m = self._testMethodName
879 api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
880 tmp_api_trace = "/tmp/%s" % api_trace
881 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
882 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
883 self.logger.info("Moving %s to %s\n" % (tmp_api_trace, vpp_api_trace_log))
884 os.rename(tmp_api_trace, vpp_api_trace_log)
885 except VppTransportSocketIOError:
887 "VppTransportSocketIOError: Vpp dead. Cannot log show commands."
891 self.registry.unregister_all(self.logger)
894 """Clear trace before running each test"""
895 super(VppTestCase, self).setUp()
896 self.reporter.send_keep_alive(self)
900 testcase=self.__class__.__name__,
901 method_name=self._testMethodName,
903 self.sleep(0.1, "during setUp")
904 self.vpp_stdout_deque.append(
905 "--- test setUp() for %s.%s(%s) starts here ---\n"
906 % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
908 self.vpp_stderr_deque.append(
909 "--- test setUp() for %s.%s(%s) starts here ---\n"
910 % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
912 self.vapi.cli("clear trace")
913 # store the test instance inside the test class - so that objects
914 # holding the class can access instance methods (like assertEqual)
915 type(self).test_instance = self
918 def pg_enable_capture(cls, interfaces=None):
920 Enable capture on packet-generator interfaces
922 :param interfaces: iterable interface indexes (if None,
923 use self.pg_interfaces)
926 if interfaces is None:
927 interfaces = cls.pg_interfaces
932 def register_pcap(cls, intf, worker):
933 """Register a pcap in the testclass"""
934 # add to the list of captures with current timestamp
935 cls._pcaps.append((intf, worker))
938 def get_vpp_time(cls):
939 # processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT"
940 # returns float("2.190522")
941 timestr = cls.vapi.cli("show clock")
942 head, sep, tail = timestr.partition(",")
943 head, sep, tail = head.partition("Time now")
947 def sleep_on_vpp_time(cls, sec):
948 """Sleep according to time in VPP world"""
949 # On a busy system with many processes
950 # we might end up with VPP time being slower than real world
951 # So take that into account when waiting for VPP to do something
952 start_time = cls.get_vpp_time()
953 while cls.get_vpp_time() - start_time < sec:
957 def pg_start(cls, trace=True):
958 """Enable the PG, wait till it is done, then clean up"""
959 for (intf, worker) in cls._old_pcaps:
960 intf.handle_old_pcap_file(intf.get_in_path(worker), intf.in_history_counter)
963 cls.vapi.cli("clear trace")
964 cls.vapi.cli("trace add pg-input 1000")
965 cls.vapi.cli("packet-generator enable")
966 # PG, when starts, runs to completion -
967 # so let's avoid a race condition,
968 # and wait a little till it's done.
969 # Then clean it up - and then be gone.
970 deadline = time.time() + 300
971 while cls.vapi.cli("show packet-generator").find("Yes") != -1:
972 cls.sleep(0.01) # yield
973 if time.time() > deadline:
974 cls.logger.error("Timeout waiting for pg to stop")
976 for intf, worker in cls._pcaps:
977 cls.vapi.cli("packet-generator delete %s" % intf.get_cap_name(worker))
978 cls._old_pcaps = cls._pcaps
982 def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0, mode=None):
984 Create packet-generator interfaces.
986 :param interfaces: iterable indexes of the interfaces.
987 :returns: List of created interfaces.
992 intf = VppPGInterface(cls, i, gso, gso_size, mode)
993 setattr(cls, intf.name, intf)
995 cls.pg_interfaces = result
999 def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
1000 pgmode = VppEnum.vl_api_pg_interface_mode_t
1001 return cls.create_pg_interfaces_internal(
1002 interfaces, gso, gso_size, pgmode.PG_API_MODE_IP4
1006 def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
1007 pgmode = VppEnum.vl_api_pg_interface_mode_t
1008 return cls.create_pg_interfaces_internal(
1009 interfaces, gso, gso_size, pgmode.PG_API_MODE_IP6
1013 def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
1014 pgmode = VppEnum.vl_api_pg_interface_mode_t
1015 return cls.create_pg_interfaces_internal(
1016 interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
1020 def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
1021 pgmode = VppEnum.vl_api_pg_interface_mode_t
1022 return cls.create_pg_interfaces_internal(
1023 interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
1027 def create_loopback_interfaces(cls, count):
1029 Create loopback interfaces.
1031 :param count: number of interfaces created.
1032 :returns: List of created interfaces.
1034 result = [VppLoInterface(cls) for i in range(count)]
1036 setattr(cls, intf.name, intf)
1037 cls.lo_interfaces = result
1041 def create_bvi_interfaces(cls, count):
1043 Create BVI interfaces.
1045 :param count: number of interfaces created.
1046 :returns: List of created interfaces.
1048 result = [VppBviInterface(cls) for i in range(count)]
1050 setattr(cls, intf.name, intf)
1051 cls.bvi_interfaces = result
1055 def extend_packet(packet, size, padding=" "):
1057 Extend packet to given size by padding with spaces or custom padding
1058 NOTE: Currently works only when Raw layer is present.
1060 :param packet: packet
1061 :param size: target size
1062 :param padding: padding used to extend the payload
1065 packet_len = len(packet) + 4
1066 extend = size - packet_len
1068 num = (extend // len(padding)) + 1
1069 packet[Raw].load += (padding * num)[:extend].encode("ascii")
1072 def reset_packet_infos(cls):
1073 """Reset the list of packet info objects and packet counts to zero"""
1074 cls._packet_infos = {}
1075 cls._packet_count_for_dst_if_idx = {}
1078 def create_packet_info(cls, src_if, dst_if):
1080 Create packet info object containing the source and destination indexes
1081 and add it to the testcase's packet info list
1083 :param VppInterface src_if: source interface
1084 :param VppInterface dst_if: destination interface
1086 :returns: _PacketInfo object
1089 info = _PacketInfo()
1090 info.index = len(cls._packet_infos)
1091 info.src = src_if.sw_if_index
1092 info.dst = dst_if.sw_if_index
1093 if isinstance(dst_if, VppSubInterface):
1094 dst_idx = dst_if.parent.sw_if_index
1096 dst_idx = dst_if.sw_if_index
1097 if dst_idx in cls._packet_count_for_dst_if_idx:
1098 cls._packet_count_for_dst_if_idx[dst_idx] += 1
1100 cls._packet_count_for_dst_if_idx[dst_idx] = 1
1101 cls._packet_infos[info.index] = info
1105 def info_to_payload(info):
1107 Convert _PacketInfo object to packet payload
1109 :param info: _PacketInfo object
1111 :returns: string containing serialized data from packet info
1114 # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1115 return pack("iiiih", info.index, info.src, info.dst, info.ip, info.proto)
1118 def payload_to_info(payload, payload_field="load"):
1120 Convert packet payload to _PacketInfo object
1122 :param payload: packet payload
1123 :type payload: <class 'scapy.packet.Raw'>
1124 :param payload_field: packet fieldname of payload "load" for
1125 <class 'scapy.packet.Raw'>
1126 :type payload_field: str
1127 :returns: _PacketInfo object containing de-serialized data from payload
1131 # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1132 payload_b = getattr(payload, payload_field)[:18]
1134 info = _PacketInfo()
1135 info.index, info.src, info.dst, info.ip, info.proto = unpack("iiiih", payload_b)
1137 # some SRv6 TCs depend on get an exception if bad values are detected
1138 if info.index > 0x4000:
1139 raise ValueError("Index value is invalid")
1143 def get_next_packet_info(self, info):
1145 Iterate over the packet info list stored in the testcase
1146 Start iteration with first element if info is None
1147 Continue based on index in info if info is specified
1149 :param info: info or None
1150 :returns: next info in list or None if no more infos
1155 next_index = info.index + 1
1156 if next_index == len(self._packet_infos):
1159 return self._packet_infos[next_index]
1161 def get_next_packet_info_for_interface(self, src_index, info):
1163 Search the packet info list for the next packet info with same source
1166 :param src_index: source interface index to search for
1167 :param info: packet info - where to start the search
1168 :returns: packet info or None
1172 info = self.get_next_packet_info(info)
1175 if info.src == src_index:
1178 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
1180 Search the packet info list for the next packet info with same source
1181 and destination interface indexes
1183 :param src_index: source interface index to search for
1184 :param dst_index: destination interface index to search for
1185 :param info: packet info - where to start the search
1186 :returns: packet info or None
1190 info = self.get_next_packet_info_for_interface(src_index, info)
1193 if info.dst == dst_index:
1196 def assert_equal(self, real_value, expected_value, name_or_class=None):
1197 if name_or_class is None:
1198 self.assertEqual(real_value, expected_value)
1201 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
1203 getdoc(name_or_class).strip(),
1205 str(name_or_class(real_value)),
1207 str(name_or_class(expected_value)),
1210 msg = "Invalid %s: %s does not match expected value %s" % (
1216 self.assertEqual(real_value, expected_value, msg)
1218 def assert_in_range(self, real_value, expected_min, expected_max, name=None):
1222 msg = "Invalid %s: %s out of range <%s,%s>" % (
1228 self.assertTrue(expected_min <= real_value <= expected_max, msg)
1230 def assert_packet_checksums_valid(self, packet, ignore_zero_udp_checksums=True):
1231 received = packet.__class__(scapy.compat.raw(packet))
1232 udp_layers = ["UDP", "UDPerror"]
1233 checksum_fields = ["cksum", "chksum"]
1236 temp = received.__class__(scapy.compat.raw(received))
1238 layer = temp.getlayer(counter)
1240 layer = layer.copy()
1241 layer.remove_payload()
1242 for cf in checksum_fields:
1243 if hasattr(layer, cf):
1245 ignore_zero_udp_checksums
1246 and 0 == getattr(layer, cf)
1247 and layer.name in udp_layers
1250 delattr(temp.getlayer(counter), cf)
1251 checksums.append((counter, cf))
1254 counter = counter + 1
1255 if 0 == len(checksums):
1257 temp = temp.__class__(scapy.compat.raw(temp))
1258 for layer, cf in checksums:
1259 calc_sum = getattr(temp[layer], cf)
1261 getattr(received[layer], cf),
1263 "packet checksum on layer #%d: %s" % (layer, temp[layer].name),
1266 "Checksum field `%s` on `%s` layer has correct value `%s`"
1267 % (cf, temp[layer].name, calc_sum)
1270 def assert_checksum_valid(
1271 self, received_packet, layer, field_name="chksum", ignore_zero_checksum=False
1273 """Check checksum of received packet on given layer"""
1274 received_packet_checksum = getattr(received_packet[layer], field_name)
1275 if ignore_zero_checksum and 0 == received_packet_checksum:
1277 recalculated = received_packet.__class__(scapy.compat.raw(received_packet))
1278 delattr(recalculated[layer], field_name)
1279 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1281 received_packet_checksum,
1282 getattr(recalculated[layer], field_name),
1283 "packet checksum on layer: %s" % layer,
1286 def assert_ip_checksum_valid(self, received_packet, ignore_zero_checksum=False):
1287 self.assert_checksum_valid(
1288 received_packet, "IP", ignore_zero_checksum=ignore_zero_checksum
1291 def assert_tcp_checksum_valid(self, received_packet, ignore_zero_checksum=False):
1292 self.assert_checksum_valid(
1293 received_packet, "TCP", ignore_zero_checksum=ignore_zero_checksum
1296 def assert_udp_checksum_valid(self, received_packet, ignore_zero_checksum=True):
1297 self.assert_checksum_valid(
1298 received_packet, "UDP", ignore_zero_checksum=ignore_zero_checksum
1301 def assert_embedded_icmp_checksum_valid(self, received_packet):
1302 if received_packet.haslayer(IPerror):
1303 self.assert_checksum_valid(received_packet, "IPerror")
1304 if received_packet.haslayer(TCPerror):
1305 self.assert_checksum_valid(received_packet, "TCPerror")
1306 if received_packet.haslayer(UDPerror):
1307 self.assert_checksum_valid(
1308 received_packet, "UDPerror", ignore_zero_checksum=True
1310 if received_packet.haslayer(ICMPerror):
1311 self.assert_checksum_valid(received_packet, "ICMPerror")
1313 def assert_icmp_checksum_valid(self, received_packet):
1314 self.assert_checksum_valid(received_packet, "ICMP")
1315 self.assert_embedded_icmp_checksum_valid(received_packet)
1317 def assert_icmpv6_checksum_valid(self, pkt):
1318 if pkt.haslayer(ICMPv6DestUnreach):
1319 self.assert_checksum_valid(pkt, "ICMPv6DestUnreach", "cksum")
1320 self.assert_embedded_icmp_checksum_valid(pkt)
1321 if pkt.haslayer(ICMPv6EchoRequest):
1322 self.assert_checksum_valid(pkt, "ICMPv6EchoRequest", "cksum")
1323 if pkt.haslayer(ICMPv6EchoReply):
1324 self.assert_checksum_valid(pkt, "ICMPv6EchoReply", "cksum")
1326 def get_counter(self, counter):
1327 if counter.startswith("/"):
1328 counter_value = self.statistics.get_counter(counter)
1330 counters = self.vapi.cli("sh errors").split("\n")
1332 for i in range(1, len(counters) - 1):
1333 results = counters[i].split()
1334 if results[1] == counter:
1335 counter_value = int(results[0])
1337 return counter_value
1339 def assert_counter_equal(self, counter, expected_value, thread=None, index=0):
1340 c = self.get_counter(counter)
1341 if thread is not None:
1342 c = c[thread][index]
1344 c = sum(x[index] for x in c)
1345 self.assert_equal(c, expected_value, "counter `%s'" % counter)
1347 def assert_packet_counter_equal(self, counter, expected_value):
1348 counter_value = self.get_counter(counter)
1350 counter_value, expected_value, "packet counter `%s'" % counter
1353 def assert_error_counter_equal(self, counter, expected_value):
1354 counter_value = self.statistics[counter].sum()
1355 self.assert_equal(counter_value, expected_value, "error counter `%s'" % counter)
1358 def sleep(cls, timeout, remark=None):
1360 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1361 # * by Guido, only the main thread can be interrupted.
1363 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1366 if hasattr(os, "sched_yield"):
1372 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1373 before = time.time()
1376 if after - before > 2 * timeout:
1378 "unexpected self.sleep() result - slept for %es instead of ~%es!",
1384 "Finished sleep (%s) - slept %es (wanted %es)",
1390 def virtual_sleep(self, timeout, remark=None):
1391 self.logger.debug("Moving VPP time by %s (%s)", timeout, remark)
1392 self.vapi.cli("set clock adjust %s" % timeout)
1394 def pg_send(self, intf, pkts, worker=None, trace=True):
1395 intf.add_stream(pkts, worker=worker)
1396 self.pg_enable_capture(self.pg_interfaces)
1397 self.pg_start(trace=trace)
1399 def snapshot_stats(self, stats_diff):
1400 """Return snapshot of interesting stats based on diff dictionary."""
1402 for sw_if_index in stats_diff:
1403 for counter in stats_diff[sw_if_index]:
1404 stats_snapshot[counter] = self.statistics[counter]
1405 self.logger.debug(f"Took statistics stats_snapshot: {stats_snapshot}")
1406 return stats_snapshot
1408 def compare_stats_with_snapshot(self, stats_diff, stats_snapshot):
1409 """Assert appropriate difference between current stats and snapshot."""
1410 for sw_if_index in stats_diff:
1411 for cntr, diff in stats_diff[sw_if_index].items():
1412 if sw_if_index == "err":
1414 self.statistics[cntr].sum(),
1415 stats_snapshot[cntr].sum() + diff,
1416 f"'{cntr}' counter value (previous value: "
1417 f"{stats_snapshot[cntr].sum()}, "
1418 f"expected diff: {diff})",
1423 self.statistics[cntr][:, sw_if_index].sum(),
1424 stats_snapshot[cntr][:, sw_if_index].sum() + diff,
1425 f"'{cntr}' counter value (previous value: "
1426 f"{stats_snapshot[cntr][:, sw_if_index].sum()}, "
1427 f"expected diff: {diff})",
1430 # if diff is 0, then this most probably a case where
1431 # test declares multiple interfaces but traffic hasn't
1432 # passed through this one yet - which means the counter
1433 # value is 0 and can be ignored
1437 def send_and_assert_no_replies(
1438 self, intf, pkts, remark="", timeout=None, stats_diff=None, trace=True, msg=None
1441 stats_snapshot = self.snapshot_stats(stats_diff)
1443 self.pg_send(intf, pkts)
1448 for i in self.pg_interfaces:
1449 i.assert_nothing_captured(timeout=timeout, remark=remark)
1454 self.logger.debug(f"send_and_assert_no_replies: {msg}")
1455 self.logger.debug(self.vapi.cli("show trace"))
1458 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1460 def send_and_expect(
1472 stats_snapshot = self.snapshot_stats(stats_diff)
1475 n_rx = 1 if isinstance(pkts, Packet) else len(pkts)
1476 self.pg_send(intf, pkts, worker=worker, trace=trace)
1477 rx = output.get_capture(n_rx)
1480 self.logger.debug(f"send_and_expect: {msg}")
1481 self.logger.debug(self.vapi.cli("show trace"))
1484 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1488 def send_and_expect_load_balancing(
1489 self, input, pkts, outputs, worker=None, trace=True
1491 self.pg_send(input, pkts, worker=worker, trace=trace)
1494 rx = oo._get_capture(1)
1495 self.assertNotEqual(0, len(rx))
1498 self.logger.debug(self.vapi.cli("show trace"))
1501 def send_and_expect_some(self, intf, pkts, output, worker=None, trace=True):
1502 self.pg_send(intf, pkts, worker=worker, trace=trace)
1503 rx = output._get_capture(1)
1505 self.logger.debug(self.vapi.cli("show trace"))
1506 self.assertTrue(len(rx) > 0)
1507 self.assertTrue(len(rx) < len(pkts))
1510 def send_and_expect_only(self, intf, pkts, output, timeout=None, stats_diff=None):
1512 stats_snapshot = self.snapshot_stats(stats_diff)
1514 self.pg_send(intf, pkts)
1515 rx = output.get_capture(len(pkts))
1519 for i in self.pg_interfaces:
1520 if i not in outputs:
1521 i.assert_nothing_captured(timeout=timeout)
1525 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1530 def get_testcase_doc_name(test):
1531 return getdoc(test.__class__).splitlines()[0]
1534 def get_test_description(descriptions, test):
1535 short_description = test.shortDescription()
1536 if descriptions and short_description:
1537 return short_description
1542 class TestCaseInfo(object):
1543 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1544 self.logger = logger
1545 self.tempdir = tempdir
1546 self.vpp_pid = vpp_pid
1547 self.vpp_bin_path = vpp_bin_path
1548 self.core_crash_test = None
1551 class VppTestResult(unittest.TestResult):
1553 @property result_string
1554 String variable to store the test case result string.
1556 List variable containing 2-tuples of TestCase instances and strings
1557 holding formatted tracebacks. Each tuple represents a test which
1558 raised an unexpected exception.
1560 List variable containing 2-tuples of TestCase instances and strings
1561 holding formatted tracebacks. Each tuple represents a test where
1562 a failure was explicitly signalled using the TestCase.assert*()
1566 failed_test_cases_info = set()
1567 core_crash_test_cases_info = set()
1568 current_test_case_info = None
1570 def __init__(self, stream=None, descriptions=None, verbosity=None, runner=None):
1572 :param stream File descriptor to store where to report test results.
1573 Set to the standard error stream by default.
1574 :param descriptions Boolean variable to store information if to use
1575 test case descriptions.
1576 :param verbosity Integer variable to store required verbosity level.
1578 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1579 self.stream = stream
1580 self.descriptions = descriptions
1581 self.verbosity = verbosity
1582 self.result_string = None
1583 self.runner = runner
1586 def addSuccess(self, test):
1588 Record a test succeeded result
1593 if self.current_test_case_info:
1594 self.current_test_case_info.logger.debug(
1595 "--- addSuccess() %s.%s(%s) called"
1596 % (test.__class__.__name__, test._testMethodName, test._testMethodDoc)
1598 unittest.TestResult.addSuccess(self, test)
1599 self.result_string = colorize("OK", GREEN)
1601 self.send_result_through_pipe(test, PASS)
1603 def addSkip(self, test, reason):
1605 Record a test skipped.
1611 if self.current_test_case_info:
1612 self.current_test_case_info.logger.debug(
1613 "--- addSkip() %s.%s(%s) called, reason is %s"
1615 test.__class__.__name__,
1616 test._testMethodName,
1617 test._testMethodDoc,
1621 unittest.TestResult.addSkip(self, test, reason)
1622 self.result_string = colorize("SKIP", YELLOW)
1624 if reason == "not enough cpus":
1625 self.send_result_through_pipe(test, SKIP_CPU_SHORTAGE)
1627 self.send_result_through_pipe(test, SKIP)
1629 def symlink_failed(self):
1630 if self.current_test_case_info:
1632 failed_dir = config.failed_dir
1633 link_path = os.path.join(
1635 "%s-FAILED" % os.path.basename(self.current_test_case_info.tempdir),
1638 self.current_test_case_info.logger.debug(
1639 "creating a link to the failed test"
1641 self.current_test_case_info.logger.debug(
1642 "os.symlink(%s, %s)"
1643 % (self.current_test_case_info.tempdir, link_path)
1645 if os.path.exists(link_path):
1646 self.current_test_case_info.logger.debug("symlink already exists")
1648 os.symlink(self.current_test_case_info.tempdir, link_path)
1650 except Exception as e:
1651 self.current_test_case_info.logger.error(e)
1653 def send_result_through_pipe(self, test, result):
1654 if hasattr(self, "test_framework_result_pipe"):
1655 pipe = self.test_framework_result_pipe
1657 pipe.send((test.id(), result))
1659 def log_error(self, test, err, fn_name):
1660 if self.current_test_case_info:
1661 if isinstance(test, unittest.suite._ErrorHolder):
1662 test_name = test.description
1664 test_name = "%s.%s(%s)" % (
1665 test.__class__.__name__,
1666 test._testMethodName,
1667 test._testMethodDoc,
1669 self.current_test_case_info.logger.debug(
1670 "--- %s() %s called, err is %s" % (fn_name, test_name, err)
1672 self.current_test_case_info.logger.debug(
1673 "formatted exception is:\n%s" % "".join(format_exception(*err))
1676 def add_error(self, test, err, unittest_fn, error_type):
1677 if error_type == FAIL:
1678 self.log_error(test, err, "addFailure")
1679 error_type_str = colorize("FAIL", RED)
1680 elif error_type == ERROR:
1681 self.log_error(test, err, "addError")
1682 error_type_str = colorize("ERROR", RED)
1685 "Error type %s cannot be used to record an "
1686 "error or a failure" % error_type
1689 unittest_fn(self, test, err)
1690 if self.current_test_case_info:
1691 self.result_string = "%s [ temp dir used by test case: %s ]" % (
1693 self.current_test_case_info.tempdir,
1695 self.symlink_failed()
1696 self.failed_test_cases_info.add(self.current_test_case_info)
1697 if is_core_present(self.current_test_case_info.tempdir):
1698 if not self.current_test_case_info.core_crash_test:
1699 if isinstance(test, unittest.suite._ErrorHolder):
1700 test_name = str(test)
1702 test_name = "'{!s}' ({!s})".format(
1703 get_testcase_doc_name(test), test.id()
1705 self.current_test_case_info.core_crash_test = test_name
1706 self.core_crash_test_cases_info.add(self.current_test_case_info)
1708 self.result_string = "%s [no temp dir]" % error_type_str
1710 self.send_result_through_pipe(test, error_type)
1712 def addFailure(self, test, err):
1714 Record a test failed result
1717 :param err: error message
1720 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1722 def addError(self, test, err):
1724 Record a test error result
1727 :param err: error message
1730 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1732 def getDescription(self, test):
1734 Get test description
1737 :returns: test description
1740 return get_test_description(self.descriptions, test)
1742 def startTest(self, test):
1750 def print_header(test):
1751 if test.__class__ in self.printed:
1754 test_doc = getdoc(test)
1756 raise Exception("No doc string for test '%s'" % test.id())
1758 test_title = test_doc.splitlines()[0].rstrip()
1759 test_title = colorize(test_title, GREEN)
1760 if test.is_tagged_run_solo():
1761 test_title = colorize(f"SOLO RUN: {test_title}", YELLOW)
1763 # This block may overwrite the colorized title above,
1764 # but we want this to stand out and be fixed
1765 if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
1766 test_title = colorize(f"FIXME with VPP workers: {test_title}", RED)
1768 if test.has_tag(TestCaseTag.FIXME_ASAN):
1769 test_title = colorize(f"FIXME with ASAN: {test_title}", RED)
1770 test.skip_fixme_asan()
1772 if is_distro_ubuntu2204 == True and test.has_tag(
1773 TestCaseTag.FIXME_UBUNTU2204
1775 test_title = colorize(f"FIXME on Ubuntu-22.04: {test_title}", RED)
1776 test.skip_fixme_ubuntu2204()
1778 if hasattr(test, "vpp_worker_count"):
1779 if test.vpp_worker_count == 0:
1780 test_title += " [main thread only]"
1781 elif test.vpp_worker_count == 1:
1782 test_title += " [1 worker thread]"
1784 test_title += f" [{test.vpp_worker_count} worker threads]"
1786 if test.__class__.skipped_due_to_cpu_lack:
1787 test_title = colorize(
1788 f"{test_title} [skipped - not enough cpus, "
1789 f"required={test.__class__.get_cpus_required()}, "
1790 f"available={max_vpp_cpus}]",
1794 print(double_line_delim)
1796 print(double_line_delim)
1797 self.printed.append(test.__class__)
1800 self.start_test = time.time()
1801 unittest.TestResult.startTest(self, test)
1802 if self.verbosity > 0:
1803 self.stream.writeln("Starting " + self.getDescription(test) + " ...")
1804 self.stream.writeln(single_line_delim)
1806 def stopTest(self, test):
1808 Called when the given test has been run
1813 unittest.TestResult.stopTest(self, test)
1815 if self.verbosity > 0:
1816 self.stream.writeln(single_line_delim)
1817 self.stream.writeln(
1818 "%-73s%s" % (self.getDescription(test), self.result_string)
1820 self.stream.writeln(single_line_delim)
1822 self.stream.writeln(
1825 self.getDescription(test),
1826 time.time() - self.start_test,
1831 self.send_result_through_pipe(test, TEST_RUN)
1833 def printErrors(self):
1835 Print errors from running the test case
1837 if len(self.errors) > 0 or len(self.failures) > 0:
1838 self.stream.writeln()
1839 self.printErrorList("ERROR", self.errors)
1840 self.printErrorList("FAIL", self.failures)
1842 # ^^ that is the last output from unittest before summary
1843 if not self.runner.print_summary:
1844 devnull = unittest.runner._WritelnDecorator(open(os.devnull, "w"))
1845 self.stream = devnull
1846 self.runner.stream = devnull
1848 def printErrorList(self, flavour, errors):
1850 Print error list to the output stream together with error type
1851 and test case description.
1853 :param flavour: error type
1854 :param errors: iterable errors
1857 for test, err in errors:
1858 self.stream.writeln(double_line_delim)
1859 self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
1860 self.stream.writeln(single_line_delim)
1861 self.stream.writeln("%s" % err)
1864 class VppTestRunner(unittest.TextTestRunner):
1866 A basic test runner implementation which prints results to standard error.
1870 def resultclass(self):
1871 """Class maintaining the results of the tests"""
1872 return VppTestResult
1876 keep_alive_pipe=None,
1886 # ignore stream setting here, use hard-coded stdout to be in sync
1887 # with prints from VppTestCase methods ...
1888 super(VppTestRunner, self).__init__(
1889 sys.stdout, descriptions, verbosity, failfast, buffer, resultclass, **kwargs
1891 KeepAliveReporter.pipe = keep_alive_pipe
1893 self.orig_stream = self.stream
1894 self.resultclass.test_framework_result_pipe = result_pipe
1896 self.print_summary = print_summary
1898 def _makeResult(self):
1899 return self.resultclass(self.stream, self.descriptions, self.verbosity, self)
1901 def run(self, test):
1908 faulthandler.enable() # emit stack trace to stderr if killed by signal
1910 result = super(VppTestRunner, self).run(test)
1911 if not self.print_summary:
1912 self.stream = self.orig_stream
1913 result.stream = self.orig_stream
1917 class Worker(Thread):
1918 def __init__(self, executable_args, logger, env=None, *args, **kwargs):
1919 super(Worker, self).__init__(*args, **kwargs)
1920 self.logger = logger
1921 self.args = executable_args
1922 if hasattr(self, "testcase") and self.testcase.debug_all:
1923 if self.testcase.debug_gdbserver:
1925 "/usr/bin/gdbserver",
1926 "localhost:{port}".format(port=self.testcase.gdbserver_port),
1928 elif self.testcase.debug_gdb and hasattr(self, "wait_for_gdb"):
1929 self.args.append(self.wait_for_gdb)
1930 self.app_bin = executable_args[0]
1931 self.app_name = os.path.basename(self.app_bin)
1932 if hasattr(self, "role"):
1933 self.app_name += " {role}".format(role=self.role)
1936 env = {} if env is None else env
1937 self.env = copy.deepcopy(env)
1939 def wait_for_enter(self):
1940 if not hasattr(self, "testcase"):
1942 if self.testcase.debug_all and self.testcase.debug_gdbserver:
1944 print(double_line_delim)
1946 "Spawned GDB Server for '{app}' with PID: {pid}".format(
1947 app=self.app_name, pid=self.process.pid
1950 elif self.testcase.debug_all and self.testcase.debug_gdb:
1952 print(double_line_delim)
1954 "Spawned '{app}' with PID: {pid}".format(
1955 app=self.app_name, pid=self.process.pid
1960 print(single_line_delim)
1961 print("You can debug '{app}' using:".format(app=self.app_name))
1962 if self.testcase.debug_gdbserver:
1966 + " -ex 'target remote localhost:{port}'".format(
1967 port=self.testcase.gdbserver_port
1971 "Now is the time to attach gdb by running the above "
1972 "command, set up breakpoints etc., then resume from "
1973 "within gdb by issuing the 'continue' command"
1975 self.testcase.gdbserver_port += 1
1976 elif self.testcase.debug_gdb:
1980 + " -ex 'attach {pid}'".format(pid=self.process.pid)
1983 "Now is the time to attach gdb by running the above "
1984 "command and set up breakpoints etc., then resume from"
1985 " within gdb by issuing the 'continue' command"
1987 print(single_line_delim)
1988 input("Press ENTER to continue running the testcase...")
1991 executable = self.args[0]
1992 if not os.path.exists(executable) or not os.access(
1993 executable, os.F_OK | os.X_OK
1995 # Exit code that means some system file did not exist,
1996 # could not be opened, or had some other kind of error.
1997 self.result = os.EX_OSFILE
1998 raise EnvironmentError(
1999 "executable '%s' is not found or executable." % executable
2002 "Running executable '{app}': '{cmd}'".format(
2003 app=self.app_name, cmd=" ".join(self.args)
2006 env = os.environ.copy()
2007 env.update(self.env)
2008 env["CK_LOG_FILE_NAME"] = "-"
2009 self.process = subprocess.Popen(
2010 ["stdbuf", "-o0", "-e0"] + self.args,
2013 preexec_fn=os.setpgrp,
2014 stdout=subprocess.PIPE,
2015 stderr=subprocess.PIPE,
2017 self.wait_for_enter()
2018 out, err = self.process.communicate()
2019 self.logger.debug("Finished running `{app}'".format(app=self.app_name))
2020 self.logger.info("Return code is `%s'" % self.process.returncode)
2021 self.logger.info(single_line_delim)
2023 "Executable `{app}' wrote to stdout:".format(app=self.app_name)
2025 self.logger.info(single_line_delim)
2026 self.logger.info(out.decode("utf-8"))
2027 self.logger.info(single_line_delim)
2029 "Executable `{app}' wrote to stderr:".format(app=self.app_name)
2031 self.logger.info(single_line_delim)
2032 self.logger.info(err.decode("utf-8"))
2033 self.logger.info(single_line_delim)
2034 self.result = self.process.returncode
2037 if __name__ == "__main__":