3 from __future__ import print_function
18 from collections import deque
19 from threading import Thread, Event
20 from inspect import getdoc, isclass
21 from traceback import format_exception
22 from logging import FileHandler, DEBUG, Formatter
24 from abc import ABC, abstractmethod
25 from struct import pack, unpack
28 from scapy.packet import Raw, Packet
29 from config import config, available_cpus, num_cpus, max_vpp_cpus
30 import hook as hookmodule
31 from vpp_pg_interface import VppPGInterface
32 from vpp_sub_interface import VppSubInterface
33 from vpp_lo_interface import VppLoInterface
34 from vpp_bvi_interface import VppBviInterface
35 from vpp_papi_provider import VppPapiProvider
36 from vpp_papi import VppEnum
38 from vpp_papi.vpp_stats import VPPStats
39 from vpp_papi.vpp_transport_socket import VppTransportSocketIOError
49 from vpp_object import VppObjectRegistry
50 from util import ppp, is_core_present
51 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
52 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
53 from scapy.layers.inet6 import ICMPv6EchoReply
56 logger = logging.getLogger(__name__)
58 # Set up an empty logger for the testcase that can be overridden as necessary
59 null_logger = logging.getLogger("VppTestCase")
60 null_logger.addHandler(logging.NullHandler())
70 if config.debug_framework:
74 Test framework module.
76 The module provides a set of tools for constructing and running tests and
77 representing the results.
81 class VppDiedError(Exception):
82 """exception for reporting that the subprocess has died."""
86 for k, v in signal.__dict__.items()
87 if k.startswith("SIG") and not k.startswith("SIG_")
90 def __init__(self, rv=None, testcase=None, method_name=None):
92 self.signal_name = None
93 self.testcase = testcase
94 self.method_name = method_name
97 self.signal_name = VppDiedError.signals_by_value[-rv]
98 except (KeyError, TypeError):
101 if testcase is None and method_name is None:
104 in_msg = " while running %s.%s" % (testcase, method_name)
107 msg = "VPP subprocess died unexpectedly%s with return code: %d%s." % (
110 " [%s]" % (self.signal_name if self.signal_name is not None else ""),
113 msg = "VPP subprocess died unexpectedly%s." % in_msg
115 super(VppDiedError, self).__init__(msg)
118 class _PacketInfo(object):
119 """Private class to create packet info object.
121 Help process information about the next packet.
122 Set variables to default values.
125 #: Store the index of the packet.
127 #: Store the index of the source packet generator interface of the packet.
129 #: Store the index of the destination packet generator interface
132 #: Store expected ip version
134 #: Store expected upper protocol
136 #: Store the copy of the former packet.
139 def __eq__(self, other):
140 index = self.index == other.index
141 src = self.src == other.src
142 dst = self.dst == other.dst
143 data = self.data == other.data
144 return index and src and dst and data
147 def pump_output(testclass):
148 """pump output from vpp stdout/stderr to proper queues"""
151 while not testclass.pump_thread_stop_flag.is_set():
152 readable = select.select(
154 testclass.vpp.stdout.fileno(),
155 testclass.vpp.stderr.fileno(),
156 testclass.pump_thread_wakeup_pipe[0],
161 if testclass.vpp.stdout.fileno() in readable:
162 read = os.read(testclass.vpp.stdout.fileno(), 102400)
164 split = read.decode("ascii", errors="backslashreplace").splitlines(True)
165 if len(stdout_fragment) > 0:
166 split[0] = "%s%s" % (stdout_fragment, split[0])
167 if len(split) > 0 and split[-1].endswith("\n"):
171 stdout_fragment = split[-1]
172 testclass.vpp_stdout_deque.extend(split[:limit])
173 if not config.cache_vpp_output:
174 for line in split[:limit]:
175 testclass.logger.info("VPP STDOUT: %s" % line.rstrip("\n"))
176 if testclass.vpp.stderr.fileno() in readable:
177 read = os.read(testclass.vpp.stderr.fileno(), 102400)
179 split = read.decode("ascii", errors="backslashreplace").splitlines(True)
180 if len(stderr_fragment) > 0:
181 split[0] = "%s%s" % (stderr_fragment, split[0])
182 if len(split) > 0 and split[-1].endswith("\n"):
186 stderr_fragment = split[-1]
188 testclass.vpp_stderr_deque.extend(split[:limit])
189 if not config.cache_vpp_output:
190 for line in split[:limit]:
191 testclass.logger.error("VPP STDERR: %s" % line.rstrip("\n"))
192 # ignoring the dummy pipe here intentionally - the
193 # flag will take care of properly terminating the loop
196 def _is_platform_aarch64():
197 return platform.machine() == "aarch64"
200 is_platform_aarch64 = _is_platform_aarch64()
203 class KeepAliveReporter(object):
205 Singleton object which reports test start to parent process
211 self.__dict__ = self._shared_state
219 def pipe(self, pipe):
220 if self._pipe is not None:
221 raise Exception("Internal error - pipe should only be set once.")
224 def send_keep_alive(self, test, desc=None):
226 Write current test tmpdir & desc to keep-alive pipe to signal liveness
228 if self.pipe is None:
229 # if not running forked..
233 desc = "%s (%s)" % (desc, unittest.util.strclass(test))
237 self.pipe.send((desc, config.vpp, test.tempdir, test.vpp.pid))
240 class TestCaseTag(Enum):
241 # marks the suites that must run at the end
242 # using only a single test runner
244 # marks the suites broken on VPP multi-worker
245 FIXME_VPP_WORKERS = 2
246 # marks the suites broken when ASan is enabled
250 def create_tag_decorator(e):
253 cls.test_tags.append(e)
254 except AttributeError:
261 tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
262 tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
263 tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN)
277 class CPUInterface(ABC):
279 skipped_due_to_cpu_lack = False
283 def get_cpus_required(cls):
287 def assign_cpus(cls, cpus):
291 class VppTestCase(CPUInterface, unittest.TestCase):
292 """This subclass is a base class for VPP test cases that are implemented as
293 classes. It provides methods to create and run test case.
296 extra_vpp_statseg_config = ""
297 extra_vpp_punt_config = []
298 extra_vpp_plugin_config = []
300 vapi_response_timeout = 5
301 remove_configured_vpp_objects_on_tear_down = True
304 def packet_infos(self):
305 """List of packet infos"""
306 return self._packet_infos
309 def get_packet_count_for_if_idx(cls, dst_if_index):
310 """Get the number of packet info for specified destination if index"""
311 if dst_if_index in cls._packet_count_for_dst_if_idx:
312 return cls._packet_count_for_dst_if_idx[dst_if_index]
317 def has_tag(cls, tag):
318 """if the test case has a given tag - return true"""
320 return tag in cls.test_tags
321 except AttributeError:
326 def is_tagged_run_solo(cls):
327 """if the test case class is timing-sensitive - return true"""
328 return cls.has_tag(TestCaseTag.RUN_SOLO)
331 def skip_fixme_asan(cls):
332 """if @tag_fixme_asan & ASan is enabled - mark for skip"""
333 if cls.has_tag(TestCaseTag.FIXME_ASAN):
334 vpp_extra_cmake_args = os.environ.get("VPP_EXTRA_CMAKE_ARGS", "")
335 if "DVPP_ENABLE_SANITIZE_ADDR=ON" in vpp_extra_cmake_args:
336 cls = unittest.skip("Skipping @tag_fixme_asan tests")(cls)
340 """Return the instance of this testcase"""
341 return cls.test_instance
344 def set_debug_flags(cls, d):
345 cls.gdbserver_port = 7777
346 cls.debug_core = False
347 cls.debug_gdb = False
348 cls.debug_gdbserver = False
349 cls.debug_all = False
350 cls.debug_attach = False
355 cls.debug_core = True
356 elif dl == "gdb" or dl == "gdb-all":
358 elif dl == "gdbserver" or dl == "gdbserver-all":
359 cls.debug_gdbserver = True
361 cls.debug_attach = True
363 raise Exception("Unrecognized DEBUG option: '%s'" % d)
364 if dl == "gdb-all" or dl == "gdbserver-all":
368 def get_vpp_worker_count(cls):
369 if not hasattr(cls, "vpp_worker_count"):
370 if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
371 cls.vpp_worker_count = 0
373 cls.vpp_worker_count = config.vpp_worker_count
374 return cls.vpp_worker_count
377 def get_cpus_required(cls):
378 return 1 + cls.get_vpp_worker_count()
381 def setUpConstants(cls):
382 """Set-up the test case class based on environment variables"""
383 cls.step = config.step
384 cls.plugin_path = ":".join(config.vpp_plugin_dir)
385 cls.test_plugin_path = ":".join(config.vpp_test_plugin_dir)
386 cls.extern_plugin_path = ":".join(config.extern_plugin_dir)
388 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
389 debug_cli = "cli-listen localhost:5002"
390 size = re.search(r"\d+[gG]", config.coredump_size)
392 coredump_size = f"coredump-size {config.coredump_size}".lower()
394 coredump_size = "coredump-size unlimited"
395 default_variant = config.variant
396 if default_variant is not None:
397 default_variant = "defaults { %s 100 }" % default_variant
401 api_fuzzing = config.api_fuzz
402 if api_fuzzing is None:
423 cls.get_api_segment_prefix(),
430 if cls.extern_plugin_path not in (None, ""):
431 cls.extra_vpp_plugin_config.append("add-path %s" % cls.extern_plugin_path)
432 if cls.get_vpp_worker_count():
433 cls.vpp_cmdline.extend(
434 ["corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])]
436 cls.vpp_cmdline.extend(
447 cls.get_stats_sock_path(),
448 cls.extra_vpp_statseg_config,
453 cls.get_api_sock_path(),
474 "lisp_unittest_plugin.so",
479 "unittest_plugin.so",
484 + cls.extra_vpp_plugin_config
490 if cls.extra_vpp_punt_config is not None:
491 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
493 if not cls.debug_attach:
494 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
495 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
498 def wait_for_enter(cls):
499 if cls.debug_gdbserver:
500 print(double_line_delim)
501 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
503 print(double_line_delim)
504 print("Spawned VPP with PID: %d" % cls.vpp.pid)
506 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
508 print(single_line_delim)
509 print("You can debug VPP using:")
510 if cls.debug_gdbserver:
512 f"sudo gdb {config.vpp} "
513 f"-ex 'target remote localhost:{cls.gdbserver_port}'"
516 "Now is the time to attach gdb by running the above "
517 "command, set up breakpoints etc., then resume VPP from "
518 "within gdb by issuing the 'continue' command"
520 cls.gdbserver_port += 1
522 print(f"sudo gdb {config.vpp} -ex 'attach {cls.vpp.pid}'")
524 "Now is the time to attach gdb by running the above "
525 "command and set up breakpoints etc., then resume VPP from"
526 " within gdb by issuing the 'continue' command"
528 print(single_line_delim)
529 input("Press ENTER to continue running the testcase...")
537 cls.logger.debug(f"Assigned cpus: {cls.cpus}")
538 cmdline = cls.vpp_cmdline
540 if cls.debug_gdbserver:
541 gdbserver = "/usr/bin/gdbserver"
542 if not os.path.isfile(gdbserver) or not os.access(gdbserver, os.X_OK):
544 "gdbserver binary '%s' does not exist or is "
545 "not executable" % gdbserver
550 "localhost:{port}".format(port=cls.gdbserver_port),
552 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
555 cls.vpp = subprocess.Popen(
556 cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE
558 except subprocess.CalledProcessError as e:
560 "Subprocess returned with non-0 return code: (%s)", e.returncode
565 "Subprocess returned with OS error: (%s) %s", e.errno, e.strerror
568 except Exception as e:
569 cls.logger.exception("Subprocess returned unexpected from %s:", cmdline)
575 def wait_for_coredump(cls):
576 corefile = cls.tempdir + "/core"
577 if os.path.isfile(corefile):
578 cls.logger.error("Waiting for coredump to complete: %s", corefile)
579 curr_size = os.path.getsize(corefile)
580 deadline = time.time() + 60
582 while time.time() < deadline:
585 curr_size = os.path.getsize(corefile)
586 if size == curr_size:
591 "Timed out waiting for coredump to complete: %s", corefile
594 cls.logger.error("Coredump complete: %s, size %d", corefile, curr_size)
597 def get_stats_sock_path(cls):
598 return "%s/stats.sock" % cls.tempdir
601 def get_api_sock_path(cls):
602 return "%s/api.sock" % cls.tempdir
605 def get_api_segment_prefix(cls):
606 return os.path.basename(cls.tempdir) # Only used for VAPI
609 def get_tempdir(cls):
611 tmpdir = f"{config.tmp_dir}/unittest-attach-gdb"
613 tmpdir = f"{config.tmp_dir}/vpp-unittest-{cls.__name__}"
614 if config.wipe_tmp_dir:
615 shutil.rmtree(tmpdir, ignore_errors=True)
620 def create_file_handler(cls):
621 if config.log_dir is None:
622 cls.file_handler = FileHandler(f"{cls.tempdir}/log.txt")
625 logdir = f"{config.log_dir}/vpp-unittest-{cls.__name__}"
626 if config.wipe_tmp_dir:
627 shutil.rmtree(logdir, ignore_errors=True)
629 cls.file_handler = FileHandler(f"{logdir}/log.txt")
634 Perform class setup before running the testcase
635 Remove shared memory files, start vpp and connect the vpp-api
637 super(VppTestCase, cls).setUpClass()
638 cls.logger = get_logger(cls.__name__)
639 random.seed(config.rnd_seed)
640 if hasattr(cls, "parallel_handler"):
641 cls.logger.addHandler(cls.parallel_handler)
642 cls.logger.propagate = False
643 cls.set_debug_flags(config.debug)
644 cls.tempdir = cls.get_tempdir()
645 cls.create_file_handler()
646 cls.file_handler.setFormatter(
647 Formatter(fmt="%(asctime)s,%(msecs)03d %(message)s", datefmt="%H:%M:%S")
649 cls.file_handler.setLevel(DEBUG)
650 cls.logger.addHandler(cls.file_handler)
651 cls.logger.debug("--- setUpClass() for %s called ---" % cls.__name__)
652 os.chdir(cls.tempdir)
654 "Temporary dir is %s, api socket is %s",
656 cls.get_api_sock_path(),
658 cls.logger.debug("Random seed is %s", config.rnd_seed)
660 cls.reset_packet_infos()
665 cls.registry = VppObjectRegistry()
666 cls.vpp_startup_failed = False
667 cls.reporter = KeepAliveReporter()
668 # need to catch exceptions here because if we raise, then the cleanup
669 # doesn't get called and we might end with a zombie vpp
675 cls.reporter.send_keep_alive(cls, "setUpClass")
676 VppTestResult.current_test_case_info = TestCaseInfo(
677 cls.logger, cls.tempdir, cls.vpp.pid, config.vpp
679 cls.vpp_stdout_deque = deque()
680 cls.vpp_stderr_deque = deque()
681 if not cls.debug_attach:
682 cls.pump_thread_stop_flag = Event()
683 cls.pump_thread_wakeup_pipe = os.pipe()
684 cls.pump_thread = Thread(target=pump_output, args=(cls,))
685 cls.pump_thread.daemon = True
686 cls.pump_thread.start()
687 if cls.debug_gdb or cls.debug_gdbserver or cls.debug_attach:
688 cls.vapi_response_timeout = 0
689 cls.vapi = VppPapiProvider(cls.__name__, cls, cls.vapi_response_timeout)
691 hook = hookmodule.StepHook(cls)
693 hook = hookmodule.PollHook(cls)
694 cls.vapi.register_hook(hook)
695 cls.statistics = VPPStats(socketname=cls.get_stats_sock_path())
699 cls.vpp_startup_failed = True
701 "VPP died shortly after startup, check the"
702 " output to standard error for possible cause"
707 except (vpp_papi.VPPIOError, Exception) as e:
708 cls.logger.debug("Exception connecting to vapi: %s" % e)
709 cls.vapi.disconnect()
711 if cls.debug_gdbserver:
714 "You're running VPP inside gdbserver but "
715 "VPP-API connection failed, did you forget "
716 "to 'continue' VPP from within gdb?",
722 last_line = cls.vapi.cli("show thread").split("\n")[-2]
723 cls.vpp_worker_count = int(last_line.split(" ")[0])
724 print("Detected VPP with %s workers." % cls.vpp_worker_count)
725 except vpp_papi.VPPRuntimeError as e:
726 cls.logger.debug("%s" % e)
729 except Exception as e:
730 cls.logger.debug("Exception connecting to VPP: %s" % e)
735 def _debug_quit(cls):
736 if cls.debug_gdbserver or cls.debug_gdb:
740 if cls.vpp.returncode is None:
742 print(double_line_delim)
743 print("VPP or GDB server is still running")
744 print(single_line_delim)
746 "When done debugging, press ENTER to kill the "
747 "process and finish running the testcase..."
749 except AttributeError:
755 Disconnect vpp-api, kill vpp and cleanup shared memory files
759 # first signal that we want to stop the pump thread, then wake it up
760 if hasattr(cls, "pump_thread_stop_flag"):
761 cls.pump_thread_stop_flag.set()
762 if hasattr(cls, "pump_thread_wakeup_pipe"):
763 os.write(cls.pump_thread_wakeup_pipe[1], b"ding dong wake up")
764 if hasattr(cls, "pump_thread"):
765 cls.logger.debug("Waiting for pump thread to stop")
766 cls.pump_thread.join()
767 if hasattr(cls, "vpp_stderr_reader_thread"):
768 cls.logger.debug("Waiting for stderr pump to stop")
769 cls.vpp_stderr_reader_thread.join()
771 if hasattr(cls, "vpp"):
772 if hasattr(cls, "vapi"):
773 cls.logger.debug(cls.vapi.vpp.get_stats())
774 cls.logger.debug("Disconnecting class vapi client on %s", cls.__name__)
775 cls.vapi.disconnect()
776 cls.logger.debug("Deleting class vapi attribute on %s", cls.__name__)
779 if not cls.debug_attach and cls.vpp.returncode is None:
780 cls.wait_for_coredump()
781 cls.logger.debug("Sending TERM to vpp")
783 cls.logger.debug("Waiting for vpp to die")
785 outs, errs = cls.vpp.communicate(timeout=5)
786 except subprocess.TimeoutExpired:
788 outs, errs = cls.vpp.communicate()
789 cls.logger.debug("Deleting class vpp attribute on %s", cls.__name__)
790 if not cls.debug_attach:
791 cls.vpp.stdout.close()
792 cls.vpp.stderr.close()
795 if cls.vpp_startup_failed:
796 stdout_log = cls.logger.info
797 stderr_log = cls.logger.critical
799 stdout_log = cls.logger.info
800 stderr_log = cls.logger.info
802 if hasattr(cls, "vpp_stdout_deque"):
803 stdout_log(single_line_delim)
804 stdout_log("VPP output to stdout while running %s:", cls.__name__)
805 stdout_log(single_line_delim)
806 vpp_output = "".join(cls.vpp_stdout_deque)
807 with open(cls.tempdir + "/vpp_stdout.txt", "w") as f:
809 stdout_log("\n%s", vpp_output)
810 stdout_log(single_line_delim)
812 if hasattr(cls, "vpp_stderr_deque"):
813 stderr_log(single_line_delim)
814 stderr_log("VPP output to stderr while running %s:", cls.__name__)
815 stderr_log(single_line_delim)
816 vpp_output = "".join(cls.vpp_stderr_deque)
817 with open(cls.tempdir + "/vpp_stderr.txt", "w") as f:
819 stderr_log("\n%s", vpp_output)
820 stderr_log(single_line_delim)
823 def tearDownClass(cls):
824 """Perform final cleanup after running all tests in this test-case"""
825 cls.logger.debug("--- tearDownClass() for %s called ---" % cls.__name__)
826 cls.reporter.send_keep_alive(cls, "tearDownClass")
828 cls.file_handler.close()
829 cls.reset_packet_infos()
830 if config.debug_framework:
831 debug_internal.on_tear_down_class(cls)
833 def show_commands_at_teardown(self):
834 """Allow subclass specific teardown logging additions."""
835 self.logger.info("--- No test specific show commands provided. ---")
838 """Show various debug prints after each test"""
840 "--- tearDown() for %s.%s(%s) called ---"
841 % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
845 if not self.vpp_dead:
846 self.logger.debug(self.vapi.cli("show trace max 1000"))
847 self.logger.info(self.vapi.ppcli("show interface"))
848 self.logger.info(self.vapi.ppcli("show hardware"))
849 self.logger.info(self.statistics.set_errors_str())
850 self.logger.info(self.vapi.ppcli("show run"))
851 self.logger.info(self.vapi.ppcli("show log"))
852 self.logger.info(self.vapi.ppcli("show bihash"))
853 self.logger.info("Logging testcase specific show commands.")
854 self.show_commands_at_teardown()
855 if self.remove_configured_vpp_objects_on_tear_down:
856 self.registry.remove_vpp_config(self.logger)
857 # Save/Dump VPP api trace log
858 m = self._testMethodName
859 api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
860 tmp_api_trace = "/tmp/%s" % api_trace
861 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
862 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
863 self.logger.info("Moving %s to %s\n" % (tmp_api_trace, vpp_api_trace_log))
864 os.rename(tmp_api_trace, vpp_api_trace_log)
865 except VppTransportSocketIOError:
867 "VppTransportSocketIOError: Vpp dead. Cannot log show commands."
871 self.registry.unregister_all(self.logger)
874 """Clear trace before running each test"""
875 super(VppTestCase, self).setUp()
876 self.reporter.send_keep_alive(self)
880 testcase=self.__class__.__name__,
881 method_name=self._testMethodName,
883 self.sleep(0.1, "during setUp")
884 self.vpp_stdout_deque.append(
885 "--- test setUp() for %s.%s(%s) starts here ---\n"
886 % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
888 self.vpp_stderr_deque.append(
889 "--- test setUp() for %s.%s(%s) starts here ---\n"
890 % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
892 self.vapi.cli("clear trace")
893 # store the test instance inside the test class - so that objects
894 # holding the class can access instance methods (like assertEqual)
895 type(self).test_instance = self
898 def pg_enable_capture(cls, interfaces=None):
900 Enable capture on packet-generator interfaces
902 :param interfaces: iterable interface indexes (if None,
903 use self.pg_interfaces)
906 if interfaces is None:
907 interfaces = cls.pg_interfaces
912 def register_pcap(cls, intf, worker):
913 """Register a pcap in the testclass"""
914 # add to the list of captures with current timestamp
915 cls._pcaps.append((intf, worker))
918 def get_vpp_time(cls):
919 # processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT"
920 # returns float("2.190522")
921 timestr = cls.vapi.cli("show clock")
922 head, sep, tail = timestr.partition(",")
923 head, sep, tail = head.partition("Time now")
927 def sleep_on_vpp_time(cls, sec):
928 """Sleep according to time in VPP world"""
929 # On a busy system with many processes
930 # we might end up with VPP time being slower than real world
931 # So take that into account when waiting for VPP to do something
932 start_time = cls.get_vpp_time()
933 while cls.get_vpp_time() - start_time < sec:
937 def pg_start(cls, trace=True):
938 """Enable the PG, wait till it is done, then clean up"""
939 for (intf, worker) in cls._old_pcaps:
940 intf.handle_old_pcap_file(intf.get_in_path(worker), intf.in_history_counter)
943 cls.vapi.cli("clear trace")
944 cls.vapi.cli("trace add pg-input 1000")
945 cls.vapi.cli("packet-generator enable")
946 # PG, when starts, runs to completion -
947 # so let's avoid a race condition,
948 # and wait a little till it's done.
949 # Then clean it up - and then be gone.
950 deadline = time.time() + 300
951 while cls.vapi.cli("show packet-generator").find("Yes") != -1:
952 cls.sleep(0.01) # yield
953 if time.time() > deadline:
954 cls.logger.error("Timeout waiting for pg to stop")
956 for intf, worker in cls._pcaps:
957 cls.vapi.cli("packet-generator delete %s" % intf.get_cap_name(worker))
958 cls._old_pcaps = cls._pcaps
962 def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0, mode=None):
964 Create packet-generator interfaces.
966 :param interfaces: iterable indexes of the interfaces.
967 :returns: List of created interfaces.
972 intf = VppPGInterface(cls, i, gso, gso_size, mode)
973 setattr(cls, intf.name, intf)
975 cls.pg_interfaces = result
979 def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
980 pgmode = VppEnum.vl_api_pg_interface_mode_t
981 return cls.create_pg_interfaces_internal(
982 interfaces, gso, gso_size, pgmode.PG_API_MODE_IP4
986 def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
987 pgmode = VppEnum.vl_api_pg_interface_mode_t
988 return cls.create_pg_interfaces_internal(
989 interfaces, gso, gso_size, pgmode.PG_API_MODE_IP6
993 def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
994 pgmode = VppEnum.vl_api_pg_interface_mode_t
995 return cls.create_pg_interfaces_internal(
996 interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
1000 def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
1001 pgmode = VppEnum.vl_api_pg_interface_mode_t
1002 return cls.create_pg_interfaces_internal(
1003 interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
1007 def create_loopback_interfaces(cls, count):
1009 Create loopback interfaces.
1011 :param count: number of interfaces created.
1012 :returns: List of created interfaces.
1014 result = [VppLoInterface(cls) for i in range(count)]
1016 setattr(cls, intf.name, intf)
1017 cls.lo_interfaces = result
1021 def create_bvi_interfaces(cls, count):
1023 Create BVI interfaces.
1025 :param count: number of interfaces created.
1026 :returns: List of created interfaces.
1028 result = [VppBviInterface(cls) for i in range(count)]
1030 setattr(cls, intf.name, intf)
1031 cls.bvi_interfaces = result
1035 def extend_packet(packet, size, padding=" "):
1037 Extend packet to given size by padding with spaces or custom padding
1038 NOTE: Currently works only when Raw layer is present.
1040 :param packet: packet
1041 :param size: target size
1042 :param padding: padding used to extend the payload
1045 packet_len = len(packet) + 4
1046 extend = size - packet_len
1048 num = (extend // len(padding)) + 1
1049 packet[Raw].load += (padding * num)[:extend].encode("ascii")
1052 def reset_packet_infos(cls):
1053 """Reset the list of packet info objects and packet counts to zero"""
1054 cls._packet_infos = {}
1055 cls._packet_count_for_dst_if_idx = {}
1058 def create_packet_info(cls, src_if, dst_if):
1060 Create packet info object containing the source and destination indexes
1061 and add it to the testcase's packet info list
1063 :param VppInterface src_if: source interface
1064 :param VppInterface dst_if: destination interface
1066 :returns: _PacketInfo object
1069 info = _PacketInfo()
1070 info.index = len(cls._packet_infos)
1071 info.src = src_if.sw_if_index
1072 info.dst = dst_if.sw_if_index
1073 if isinstance(dst_if, VppSubInterface):
1074 dst_idx = dst_if.parent.sw_if_index
1076 dst_idx = dst_if.sw_if_index
1077 if dst_idx in cls._packet_count_for_dst_if_idx:
1078 cls._packet_count_for_dst_if_idx[dst_idx] += 1
1080 cls._packet_count_for_dst_if_idx[dst_idx] = 1
1081 cls._packet_infos[info.index] = info
1085 def info_to_payload(info):
1087 Convert _PacketInfo object to packet payload
1089 :param info: _PacketInfo object
1091 :returns: string containing serialized data from packet info
1094 # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1095 return pack("iiiih", info.index, info.src, info.dst, info.ip, info.proto)
1098 def payload_to_info(payload, payload_field="load"):
1100 Convert packet payload to _PacketInfo object
1102 :param payload: packet payload
1103 :type payload: <class 'scapy.packet.Raw'>
1104 :param payload_field: packet fieldname of payload "load" for
1105 <class 'scapy.packet.Raw'>
1106 :type payload_field: str
1107 :returns: _PacketInfo object containing de-serialized data from payload
1111 # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1112 payload_b = getattr(payload, payload_field)[:18]
1114 info = _PacketInfo()
1115 info.index, info.src, info.dst, info.ip, info.proto = unpack("iiiih", payload_b)
1117 # some SRv6 TCs depend on get an exception if bad values are detected
1118 if info.index > 0x4000:
1119 raise ValueError("Index value is invalid")
1123 def get_next_packet_info(self, info):
1125 Iterate over the packet info list stored in the testcase
1126 Start iteration with first element if info is None
1127 Continue based on index in info if info is specified
1129 :param info: info or None
1130 :returns: next info in list or None if no more infos
1135 next_index = info.index + 1
1136 if next_index == len(self._packet_infos):
1139 return self._packet_infos[next_index]
1141 def get_next_packet_info_for_interface(self, src_index, info):
1143 Search the packet info list for the next packet info with same source
1146 :param src_index: source interface index to search for
1147 :param info: packet info - where to start the search
1148 :returns: packet info or None
1152 info = self.get_next_packet_info(info)
1155 if info.src == src_index:
1158 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
1160 Search the packet info list for the next packet info with same source
1161 and destination interface indexes
1163 :param src_index: source interface index to search for
1164 :param dst_index: destination interface index to search for
1165 :param info: packet info - where to start the search
1166 :returns: packet info or None
1170 info = self.get_next_packet_info_for_interface(src_index, info)
1173 if info.dst == dst_index:
1176 def assert_equal(self, real_value, expected_value, name_or_class=None):
1177 if name_or_class is None:
1178 self.assertEqual(real_value, expected_value)
1181 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
1183 getdoc(name_or_class).strip(),
1185 str(name_or_class(real_value)),
1187 str(name_or_class(expected_value)),
1190 msg = "Invalid %s: %s does not match expected value %s" % (
1196 self.assertEqual(real_value, expected_value, msg)
1198 def assert_in_range(self, real_value, expected_min, expected_max, name=None):
1202 msg = "Invalid %s: %s out of range <%s,%s>" % (
1208 self.assertTrue(expected_min <= real_value <= expected_max, msg)
1210 def assert_packet_checksums_valid(self, packet, ignore_zero_udp_checksums=True):
1211 received = packet.__class__(scapy.compat.raw(packet))
1212 udp_layers = ["UDP", "UDPerror"]
1213 checksum_fields = ["cksum", "chksum"]
1216 temp = received.__class__(scapy.compat.raw(received))
1218 layer = temp.getlayer(counter)
1220 layer = layer.copy()
1221 layer.remove_payload()
1222 for cf in checksum_fields:
1223 if hasattr(layer, cf):
1225 ignore_zero_udp_checksums
1226 and 0 == getattr(layer, cf)
1227 and layer.name in udp_layers
1230 delattr(temp.getlayer(counter), cf)
1231 checksums.append((counter, cf))
1234 counter = counter + 1
1235 if 0 == len(checksums):
1237 temp = temp.__class__(scapy.compat.raw(temp))
1238 for layer, cf in checksums:
1239 calc_sum = getattr(temp[layer], cf)
1241 getattr(received[layer], cf),
1243 "packet checksum on layer #%d: %s" % (layer, temp[layer].name),
1246 "Checksum field `%s` on `%s` layer has correct value `%s`"
1247 % (cf, temp[layer].name, calc_sum)
1250 def assert_checksum_valid(
1251 self, received_packet, layer, field_name="chksum", ignore_zero_checksum=False
1253 """Check checksum of received packet on given layer"""
1254 received_packet_checksum = getattr(received_packet[layer], field_name)
1255 if ignore_zero_checksum and 0 == received_packet_checksum:
1257 recalculated = received_packet.__class__(scapy.compat.raw(received_packet))
1258 delattr(recalculated[layer], field_name)
1259 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1261 received_packet_checksum,
1262 getattr(recalculated[layer], field_name),
1263 "packet checksum on layer: %s" % layer,
1266 def assert_ip_checksum_valid(self, received_packet, ignore_zero_checksum=False):
1267 self.assert_checksum_valid(
1268 received_packet, "IP", ignore_zero_checksum=ignore_zero_checksum
1271 def assert_tcp_checksum_valid(self, received_packet, ignore_zero_checksum=False):
1272 self.assert_checksum_valid(
1273 received_packet, "TCP", ignore_zero_checksum=ignore_zero_checksum
1276 def assert_udp_checksum_valid(self, received_packet, ignore_zero_checksum=True):
1277 self.assert_checksum_valid(
1278 received_packet, "UDP", ignore_zero_checksum=ignore_zero_checksum
1281 def assert_embedded_icmp_checksum_valid(self, received_packet):
1282 if received_packet.haslayer(IPerror):
1283 self.assert_checksum_valid(received_packet, "IPerror")
1284 if received_packet.haslayer(TCPerror):
1285 self.assert_checksum_valid(received_packet, "TCPerror")
1286 if received_packet.haslayer(UDPerror):
1287 self.assert_checksum_valid(
1288 received_packet, "UDPerror", ignore_zero_checksum=True
1290 if received_packet.haslayer(ICMPerror):
1291 self.assert_checksum_valid(received_packet, "ICMPerror")
1293 def assert_icmp_checksum_valid(self, received_packet):
1294 self.assert_checksum_valid(received_packet, "ICMP")
1295 self.assert_embedded_icmp_checksum_valid(received_packet)
1297 def assert_icmpv6_checksum_valid(self, pkt):
1298 if pkt.haslayer(ICMPv6DestUnreach):
1299 self.assert_checksum_valid(pkt, "ICMPv6DestUnreach", "cksum")
1300 self.assert_embedded_icmp_checksum_valid(pkt)
1301 if pkt.haslayer(ICMPv6EchoRequest):
1302 self.assert_checksum_valid(pkt, "ICMPv6EchoRequest", "cksum")
1303 if pkt.haslayer(ICMPv6EchoReply):
1304 self.assert_checksum_valid(pkt, "ICMPv6EchoReply", "cksum")
1306 def get_counter(self, counter):
1307 if counter.startswith("/"):
1308 counter_value = self.statistics.get_counter(counter)
1310 counters = self.vapi.cli("sh errors").split("\n")
1312 for i in range(1, len(counters) - 1):
1313 results = counters[i].split()
1314 if results[1] == counter:
1315 counter_value = int(results[0])
1317 return counter_value
1319 def assert_counter_equal(self, counter, expected_value, thread=None, index=0):
1320 c = self.get_counter(counter)
1321 if thread is not None:
1322 c = c[thread][index]
1324 c = sum(x[index] for x in c)
1325 self.assert_equal(c, expected_value, "counter `%s'" % counter)
1327 def assert_packet_counter_equal(self, counter, expected_value):
1328 counter_value = self.get_counter(counter)
1330 counter_value, expected_value, "packet counter `%s'" % counter
1333 def assert_error_counter_equal(self, counter, expected_value):
1334 counter_value = self.statistics[counter].sum()
1335 self.assert_equal(counter_value, expected_value, "error counter `%s'" % counter)
1338 def sleep(cls, timeout, remark=None):
1340 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1341 # * by Guido, only the main thread can be interrupted.
1343 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1346 if hasattr(os, "sched_yield"):
1352 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1353 before = time.time()
1356 if after - before > 2 * timeout:
1358 "unexpected self.sleep() result - slept for %es instead of ~%es!",
1364 "Finished sleep (%s) - slept %es (wanted %es)",
1370 def virtual_sleep(self, timeout, remark=None):
1371 self.logger.debug("Moving VPP time by %s (%s)", timeout, remark)
1372 self.vapi.cli("set clock adjust %s" % timeout)
1374 def pg_send(self, intf, pkts, worker=None, trace=True):
1375 intf.add_stream(pkts, worker=worker)
1376 self.pg_enable_capture(self.pg_interfaces)
1377 self.pg_start(trace=trace)
1379 def snapshot_stats(self, stats_diff):
1380 """Return snapshot of interesting stats based on diff dictionary."""
1382 for sw_if_index in stats_diff:
1383 for counter in stats_diff[sw_if_index]:
1384 stats_snapshot[counter] = self.statistics[counter]
1385 self.logger.debug(f"Took statistics stats_snapshot: {stats_snapshot}")
1386 return stats_snapshot
1388 def compare_stats_with_snapshot(self, stats_diff, stats_snapshot):
1389 """Assert appropriate difference between current stats and snapshot."""
1390 for sw_if_index in stats_diff:
1391 for cntr, diff in stats_diff[sw_if_index].items():
1392 if sw_if_index == "err":
1394 self.statistics[cntr].sum(),
1395 stats_snapshot[cntr].sum() + diff,
1396 f"'{cntr}' counter value (previous value: "
1397 f"{stats_snapshot[cntr].sum()}, "
1398 f"expected diff: {diff})",
1403 self.statistics[cntr][:, sw_if_index].sum(),
1404 stats_snapshot[cntr][:, sw_if_index].sum() + diff,
1405 f"'{cntr}' counter value (previous value: "
1406 f"{stats_snapshot[cntr][:, sw_if_index].sum()}, "
1407 f"expected diff: {diff})",
1410 # if diff is 0, then this most probably a case where
1411 # test declares multiple interfaces but traffic hasn't
1412 # passed through this one yet - which means the counter
1413 # value is 0 and can be ignored
1417 def send_and_assert_no_replies(
1418 self, intf, pkts, remark="", timeout=None, stats_diff=None, trace=True, msg=None
1421 stats_snapshot = self.snapshot_stats(stats_diff)
1423 self.pg_send(intf, pkts)
1428 for i in self.pg_interfaces:
1429 i.assert_nothing_captured(timeout=timeout, remark=remark)
1434 self.logger.debug(f"send_and_assert_no_replies: {msg}")
1435 self.logger.debug(self.vapi.cli("show trace"))
1438 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1440 def send_and_expect(
1452 stats_snapshot = self.snapshot_stats(stats_diff)
1455 n_rx = 1 if isinstance(pkts, Packet) else len(pkts)
1456 self.pg_send(intf, pkts, worker=worker, trace=trace)
1457 rx = output.get_capture(n_rx)
1460 self.logger.debug(f"send_and_expect: {msg}")
1461 self.logger.debug(self.vapi.cli("show trace"))
1464 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1468 def send_and_expect_load_balancing(
1469 self, input, pkts, outputs, worker=None, trace=True
1471 self.pg_send(input, pkts, worker=worker, trace=trace)
1474 rx = oo._get_capture(1)
1475 self.assertNotEqual(0, len(rx))
1478 self.logger.debug(self.vapi.cli("show trace"))
1481 def send_and_expect_some(self, intf, pkts, output, worker=None, trace=True):
1482 self.pg_send(intf, pkts, worker=worker, trace=trace)
1483 rx = output._get_capture(1)
1485 self.logger.debug(self.vapi.cli("show trace"))
1486 self.assertTrue(len(rx) > 0)
1487 self.assertTrue(len(rx) < len(pkts))
1490 def send_and_expect_only(self, intf, pkts, output, timeout=None, stats_diff=None):
1492 stats_snapshot = self.snapshot_stats(stats_diff)
1494 self.pg_send(intf, pkts)
1495 rx = output.get_capture(len(pkts))
1499 for i in self.pg_interfaces:
1500 if i not in outputs:
1501 i.assert_nothing_captured(timeout=timeout)
1505 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1510 def get_testcase_doc_name(test):
1511 return getdoc(test.__class__).splitlines()[0]
1514 def get_test_description(descriptions, test):
1515 short_description = test.shortDescription()
1516 if descriptions and short_description:
1517 return short_description
1522 class TestCaseInfo(object):
1523 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1524 self.logger = logger
1525 self.tempdir = tempdir
1526 self.vpp_pid = vpp_pid
1527 self.vpp_bin_path = vpp_bin_path
1528 self.core_crash_test = None
1531 class VppTestResult(unittest.TestResult):
1533 @property result_string
1534 String variable to store the test case result string.
1536 List variable containing 2-tuples of TestCase instances and strings
1537 holding formatted tracebacks. Each tuple represents a test which
1538 raised an unexpected exception.
1540 List variable containing 2-tuples of TestCase instances and strings
1541 holding formatted tracebacks. Each tuple represents a test where
1542 a failure was explicitly signalled using the TestCase.assert*()
1546 failed_test_cases_info = set()
1547 core_crash_test_cases_info = set()
1548 current_test_case_info = None
1550 def __init__(self, stream=None, descriptions=None, verbosity=None, runner=None):
1552 :param stream File descriptor to store where to report test results.
1553 Set to the standard error stream by default.
1554 :param descriptions Boolean variable to store information if to use
1555 test case descriptions.
1556 :param verbosity Integer variable to store required verbosity level.
1558 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1559 self.stream = stream
1560 self.descriptions = descriptions
1561 self.verbosity = verbosity
1562 self.result_string = None
1563 self.runner = runner
1566 def addSuccess(self, test):
1568 Record a test succeeded result
1573 if self.current_test_case_info:
1574 self.current_test_case_info.logger.debug(
1575 "--- addSuccess() %s.%s(%s) called"
1576 % (test.__class__.__name__, test._testMethodName, test._testMethodDoc)
1578 unittest.TestResult.addSuccess(self, test)
1579 self.result_string = colorize("OK", GREEN)
1581 self.send_result_through_pipe(test, PASS)
1583 def addSkip(self, test, reason):
1585 Record a test skipped.
1591 if self.current_test_case_info:
1592 self.current_test_case_info.logger.debug(
1593 "--- addSkip() %s.%s(%s) called, reason is %s"
1595 test.__class__.__name__,
1596 test._testMethodName,
1597 test._testMethodDoc,
1601 unittest.TestResult.addSkip(self, test, reason)
1602 self.result_string = colorize("SKIP", YELLOW)
1604 if reason == "not enough cpus":
1605 self.send_result_through_pipe(test, SKIP_CPU_SHORTAGE)
1607 self.send_result_through_pipe(test, SKIP)
1609 def symlink_failed(self):
1610 if self.current_test_case_info:
1612 failed_dir = config.failed_dir
1613 link_path = os.path.join(
1615 "%s-FAILED" % os.path.basename(self.current_test_case_info.tempdir),
1618 self.current_test_case_info.logger.debug(
1619 "creating a link to the failed test"
1621 self.current_test_case_info.logger.debug(
1622 "os.symlink(%s, %s)"
1623 % (self.current_test_case_info.tempdir, link_path)
1625 if os.path.exists(link_path):
1626 self.current_test_case_info.logger.debug("symlink already exists")
1628 os.symlink(self.current_test_case_info.tempdir, link_path)
1630 except Exception as e:
1631 self.current_test_case_info.logger.error(e)
1633 def send_result_through_pipe(self, test, result):
1634 if hasattr(self, "test_framework_result_pipe"):
1635 pipe = self.test_framework_result_pipe
1637 pipe.send((test.id(), result))
1639 def log_error(self, test, err, fn_name):
1640 if self.current_test_case_info:
1641 if isinstance(test, unittest.suite._ErrorHolder):
1642 test_name = test.description
1644 test_name = "%s.%s(%s)" % (
1645 test.__class__.__name__,
1646 test._testMethodName,
1647 test._testMethodDoc,
1649 self.current_test_case_info.logger.debug(
1650 "--- %s() %s called, err is %s" % (fn_name, test_name, err)
1652 self.current_test_case_info.logger.debug(
1653 "formatted exception is:\n%s" % "".join(format_exception(*err))
1656 def add_error(self, test, err, unittest_fn, error_type):
1657 if error_type == FAIL:
1658 self.log_error(test, err, "addFailure")
1659 error_type_str = colorize("FAIL", RED)
1660 elif error_type == ERROR:
1661 self.log_error(test, err, "addError")
1662 error_type_str = colorize("ERROR", RED)
1665 "Error type %s cannot be used to record an "
1666 "error or a failure" % error_type
1669 unittest_fn(self, test, err)
1670 if self.current_test_case_info:
1671 self.result_string = "%s [ temp dir used by test case: %s ]" % (
1673 self.current_test_case_info.tempdir,
1675 self.symlink_failed()
1676 self.failed_test_cases_info.add(self.current_test_case_info)
1677 if is_core_present(self.current_test_case_info.tempdir):
1678 if not self.current_test_case_info.core_crash_test:
1679 if isinstance(test, unittest.suite._ErrorHolder):
1680 test_name = str(test)
1682 test_name = "'{!s}' ({!s})".format(
1683 get_testcase_doc_name(test), test.id()
1685 self.current_test_case_info.core_crash_test = test_name
1686 self.core_crash_test_cases_info.add(self.current_test_case_info)
1688 self.result_string = "%s [no temp dir]" % error_type_str
1690 self.send_result_through_pipe(test, error_type)
1692 def addFailure(self, test, err):
1694 Record a test failed result
1697 :param err: error message
1700 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1702 def addError(self, test, err):
1704 Record a test error result
1707 :param err: error message
1710 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1712 def getDescription(self, test):
1714 Get test description
1717 :returns: test description
1720 return get_test_description(self.descriptions, test)
1722 def startTest(self, test):
1730 def print_header(test):
1731 if test.__class__ in self.printed:
1734 test_doc = getdoc(test)
1736 raise Exception("No doc string for test '%s'" % test.id())
1738 test_title = test_doc.splitlines()[0].rstrip()
1739 test_title = colorize(test_title, GREEN)
1740 if test.is_tagged_run_solo():
1741 test_title = colorize(f"SOLO RUN: {test_title}", YELLOW)
1743 # This block may overwrite the colorized title above,
1744 # but we want this to stand out and be fixed
1745 if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
1746 test_title = colorize(f"FIXME with VPP workers: {test_title}", RED)
1748 if test.has_tag(TestCaseTag.FIXME_ASAN):
1749 test_title = colorize(f"FIXME with ASAN: {test_title}", RED)
1750 test.skip_fixme_asan()
1752 if hasattr(test, "vpp_worker_count"):
1753 if test.vpp_worker_count == 0:
1754 test_title += " [main thread only]"
1755 elif test.vpp_worker_count == 1:
1756 test_title += " [1 worker thread]"
1758 test_title += f" [{test.vpp_worker_count} worker threads]"
1760 if test.__class__.skipped_due_to_cpu_lack:
1761 test_title = colorize(
1762 f"{test_title} [skipped - not enough cpus, "
1763 f"required={test.__class__.get_cpus_required()}, "
1764 f"available={max_vpp_cpus}]",
1768 print(double_line_delim)
1770 print(double_line_delim)
1771 self.printed.append(test.__class__)
1774 self.start_test = time.time()
1775 unittest.TestResult.startTest(self, test)
1776 if self.verbosity > 0:
1777 self.stream.writeln("Starting " + self.getDescription(test) + " ...")
1778 self.stream.writeln(single_line_delim)
1780 def stopTest(self, test):
1782 Called when the given test has been run
1787 unittest.TestResult.stopTest(self, test)
1789 if self.verbosity > 0:
1790 self.stream.writeln(single_line_delim)
1791 self.stream.writeln(
1792 "%-73s%s" % (self.getDescription(test), self.result_string)
1794 self.stream.writeln(single_line_delim)
1796 self.stream.writeln(
1799 self.getDescription(test),
1800 time.time() - self.start_test,
1805 self.send_result_through_pipe(test, TEST_RUN)
1807 def printErrors(self):
1809 Print errors from running the test case
1811 if len(self.errors) > 0 or len(self.failures) > 0:
1812 self.stream.writeln()
1813 self.printErrorList("ERROR", self.errors)
1814 self.printErrorList("FAIL", self.failures)
1816 # ^^ that is the last output from unittest before summary
1817 if not self.runner.print_summary:
1818 devnull = unittest.runner._WritelnDecorator(open(os.devnull, "w"))
1819 self.stream = devnull
1820 self.runner.stream = devnull
1822 def printErrorList(self, flavour, errors):
1824 Print error list to the output stream together with error type
1825 and test case description.
1827 :param flavour: error type
1828 :param errors: iterable errors
1831 for test, err in errors:
1832 self.stream.writeln(double_line_delim)
1833 self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
1834 self.stream.writeln(single_line_delim)
1835 self.stream.writeln("%s" % err)
1838 class VppTestRunner(unittest.TextTestRunner):
1840 A basic test runner implementation which prints results to standard error.
1844 def resultclass(self):
1845 """Class maintaining the results of the tests"""
1846 return VppTestResult
1850 keep_alive_pipe=None,
1860 # ignore stream setting here, use hard-coded stdout to be in sync
1861 # with prints from VppTestCase methods ...
1862 super(VppTestRunner, self).__init__(
1863 sys.stdout, descriptions, verbosity, failfast, buffer, resultclass, **kwargs
1865 KeepAliveReporter.pipe = keep_alive_pipe
1867 self.orig_stream = self.stream
1868 self.resultclass.test_framework_result_pipe = result_pipe
1870 self.print_summary = print_summary
1872 def _makeResult(self):
1873 return self.resultclass(self.stream, self.descriptions, self.verbosity, self)
1875 def run(self, test):
1882 faulthandler.enable() # emit stack trace to stderr if killed by signal
1884 result = super(VppTestRunner, self).run(test)
1885 if not self.print_summary:
1886 self.stream = self.orig_stream
1887 result.stream = self.orig_stream
1891 class Worker(Thread):
1892 def __init__(self, executable_args, logger, env=None, *args, **kwargs):
1893 super(Worker, self).__init__(*args, **kwargs)
1894 self.logger = logger
1895 self.args = executable_args
1896 if hasattr(self, "testcase") and self.testcase.debug_all:
1897 if self.testcase.debug_gdbserver:
1899 "/usr/bin/gdbserver",
1900 "localhost:{port}".format(port=self.testcase.gdbserver_port),
1902 elif self.testcase.debug_gdb and hasattr(self, "wait_for_gdb"):
1903 self.args.append(self.wait_for_gdb)
1904 self.app_bin = executable_args[0]
1905 self.app_name = os.path.basename(self.app_bin)
1906 if hasattr(self, "role"):
1907 self.app_name += " {role}".format(role=self.role)
1910 env = {} if env is None else env
1911 self.env = copy.deepcopy(env)
1913 def wait_for_enter(self):
1914 if not hasattr(self, "testcase"):
1916 if self.testcase.debug_all and self.testcase.debug_gdbserver:
1918 print(double_line_delim)
1920 "Spawned GDB Server for '{app}' with PID: {pid}".format(
1921 app=self.app_name, pid=self.process.pid
1924 elif self.testcase.debug_all and self.testcase.debug_gdb:
1926 print(double_line_delim)
1928 "Spawned '{app}' with PID: {pid}".format(
1929 app=self.app_name, pid=self.process.pid
1934 print(single_line_delim)
1935 print("You can debug '{app}' using:".format(app=self.app_name))
1936 if self.testcase.debug_gdbserver:
1940 + " -ex 'target remote localhost:{port}'".format(
1941 port=self.testcase.gdbserver_port
1945 "Now is the time to attach gdb by running the above "
1946 "command, set up breakpoints etc., then resume from "
1947 "within gdb by issuing the 'continue' command"
1949 self.testcase.gdbserver_port += 1
1950 elif self.testcase.debug_gdb:
1954 + " -ex 'attach {pid}'".format(pid=self.process.pid)
1957 "Now is the time to attach gdb by running the above "
1958 "command and set up breakpoints etc., then resume from"
1959 " within gdb by issuing the 'continue' command"
1961 print(single_line_delim)
1962 input("Press ENTER to continue running the testcase...")
1965 executable = self.args[0]
1966 if not os.path.exists(executable) or not os.access(
1967 executable, os.F_OK | os.X_OK
1969 # Exit code that means some system file did not exist,
1970 # could not be opened, or had some other kind of error.
1971 self.result = os.EX_OSFILE
1972 raise EnvironmentError(
1973 "executable '%s' is not found or executable." % executable
1976 "Running executable '{app}': '{cmd}'".format(
1977 app=self.app_name, cmd=" ".join(self.args)
1980 env = os.environ.copy()
1981 env.update(self.env)
1982 env["CK_LOG_FILE_NAME"] = "-"
1983 self.process = subprocess.Popen(
1984 ["stdbuf", "-o0", "-e0"] + self.args,
1987 preexec_fn=os.setpgrp,
1988 stdout=subprocess.PIPE,
1989 stderr=subprocess.PIPE,
1991 self.wait_for_enter()
1992 out, err = self.process.communicate()
1993 self.logger.debug("Finished running `{app}'".format(app=self.app_name))
1994 self.logger.info("Return code is `%s'" % self.process.returncode)
1995 self.logger.info(single_line_delim)
1997 "Executable `{app}' wrote to stdout:".format(app=self.app_name)
1999 self.logger.info(single_line_delim)
2000 self.logger.info(out.decode("utf-8"))
2001 self.logger.info(single_line_delim)
2003 "Executable `{app}' wrote to stderr:".format(app=self.app_name)
2005 self.logger.info(single_line_delim)
2006 self.logger.info(err.decode("utf-8"))
2007 self.logger.info(single_line_delim)
2008 self.result = self.process.returncode
2011 if __name__ == "__main__":