3 from __future__ import print_function
18 from collections import deque
19 from threading import Thread, Event
20 from inspect import getdoc, isclass
21 from traceback import format_exception
22 from logging import FileHandler, DEBUG, Formatter
24 from abc import ABC, abstractmethod
25 from struct import pack, unpack
28 from scapy.packet import Raw, Packet
29 from config import config, available_cpus, num_cpus, max_vpp_cpus
30 import hook as hookmodule
31 from vpp_pg_interface import VppPGInterface
32 from vpp_sub_interface import VppSubInterface
33 from vpp_lo_interface import VppLoInterface
34 from vpp_bvi_interface import VppBviInterface
35 from vpp_papi_provider import VppPapiProvider
36 from vpp_papi import VppEnum
38 from vpp_papi.vpp_stats import VPPStats
39 from vpp_papi.vpp_transport_socket import VppTransportSocketIOError
40 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
42 from vpp_object import VppObjectRegistry
43 from util import ppp, is_core_present
44 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
45 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
46 from scapy.layers.inet6 import ICMPv6EchoReply
49 logger = logging.getLogger(__name__)
51 # Set up an empty logger for the testcase that can be overridden as necessary
52 null_logger = logging.getLogger('VppTestCase')
53 null_logger.addHandler(logging.NullHandler())
63 if config.debug_framework:
67 Test framework module.
69 The module provides a set of tools for constructing and running tests and
70 representing the results.
74 class VppDiedError(Exception):
75 """ exception for reporting that the subprocess has died."""
77 signals_by_value = {v: k for k, v in signal.__dict__.items() if
78 k.startswith('SIG') and not k.startswith('SIG_')}
80 def __init__(self, rv=None, testcase=None, method_name=None):
82 self.signal_name = None
83 self.testcase = testcase
84 self.method_name = method_name
87 self.signal_name = VppDiedError.signals_by_value[-rv]
88 except (KeyError, TypeError):
91 if testcase is None and method_name is None:
94 in_msg = ' while running %s.%s' % (testcase, method_name)
97 msg = "VPP subprocess died unexpectedly%s with return code: %d%s."\
98 % (in_msg, self.rv, ' [%s]' %
100 self.signal_name is not None else ''))
102 msg = "VPP subprocess died unexpectedly%s." % in_msg
104 super(VppDiedError, self).__init__(msg)
107 class _PacketInfo(object):
108 """Private class to create packet info object.
110 Help process information about the next packet.
111 Set variables to default values.
113 #: Store the index of the packet.
115 #: Store the index of the source packet generator interface of the packet.
117 #: Store the index of the destination packet generator interface
120 #: Store expected ip version
122 #: Store expected upper protocol
124 #: Store the copy of the former packet.
127 def __eq__(self, other):
128 index = self.index == other.index
129 src = self.src == other.src
130 dst = self.dst == other.dst
131 data = self.data == other.data
132 return index and src and dst and data
135 def pump_output(testclass):
136 """ pump output from vpp stdout/stderr to proper queues """
139 while not testclass.pump_thread_stop_flag.is_set():
140 readable = select.select([testclass.vpp.stdout.fileno(),
141 testclass.vpp.stderr.fileno(),
142 testclass.pump_thread_wakeup_pipe[0]],
144 if testclass.vpp.stdout.fileno() in readable:
145 read = os.read(testclass.vpp.stdout.fileno(), 102400)
147 split = read.decode('ascii',
148 errors='backslashreplace').splitlines(True)
149 if len(stdout_fragment) > 0:
150 split[0] = "%s%s" % (stdout_fragment, split[0])
151 if len(split) > 0 and split[-1].endswith("\n"):
155 stdout_fragment = split[-1]
156 testclass.vpp_stdout_deque.extend(split[:limit])
157 if not config.cache_vpp_output:
158 for line in split[:limit]:
159 testclass.logger.info(
160 "VPP STDOUT: %s" % line.rstrip("\n"))
161 if testclass.vpp.stderr.fileno() in readable:
162 read = os.read(testclass.vpp.stderr.fileno(), 102400)
164 split = read.decode('ascii',
165 errors='backslashreplace').splitlines(True)
166 if len(stderr_fragment) > 0:
167 split[0] = "%s%s" % (stderr_fragment, split[0])
168 if len(split) > 0 and split[-1].endswith("\n"):
172 stderr_fragment = split[-1]
174 testclass.vpp_stderr_deque.extend(split[:limit])
175 if not config.cache_vpp_output:
176 for line in split[:limit]:
177 testclass.logger.error(
178 "VPP STDERR: %s" % line.rstrip("\n"))
179 # ignoring the dummy pipe here intentionally - the
180 # flag will take care of properly terminating the loop
183 def _is_platform_aarch64():
184 return platform.machine() == 'aarch64'
187 is_platform_aarch64 = _is_platform_aarch64()
190 class KeepAliveReporter(object):
192 Singleton object which reports test start to parent process
197 self.__dict__ = self._shared_state
205 def pipe(self, pipe):
206 if self._pipe is not None:
207 raise Exception("Internal error - pipe should only be set once.")
210 def send_keep_alive(self, test, desc=None):
212 Write current test tmpdir & desc to keep-alive pipe to signal liveness
214 if self.pipe is None:
215 # if not running forked..
219 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
223 self.pipe.send((desc, config.vpp, test.tempdir, test.vpp.pid))
226 class TestCaseTag(Enum):
227 # marks the suites that must run at the end
228 # using only a single test runner
230 # marks the suites broken on VPP multi-worker
231 FIXME_VPP_WORKERS = 2
232 # marks the suites broken when ASan is enabled
236 def create_tag_decorator(e):
239 cls.test_tags.append(e)
240 except AttributeError:
246 tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
247 tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
248 tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN)
262 class CPUInterface(ABC):
264 skipped_due_to_cpu_lack = False
268 def get_cpus_required(cls):
272 def assign_cpus(cls, cpus):
276 class VppTestCase(CPUInterface, unittest.TestCase):
277 """This subclass is a base class for VPP test cases that are implemented as
278 classes. It provides methods to create and run test case.
281 extra_vpp_statseg_config = ""
282 extra_vpp_punt_config = []
283 extra_vpp_plugin_config = []
285 vapi_response_timeout = 5
286 remove_configured_vpp_objects_on_tear_down = True
289 def packet_infos(self):
290 """List of packet infos"""
291 return self._packet_infos
294 def get_packet_count_for_if_idx(cls, dst_if_index):
295 """Get the number of packet info for specified destination if index"""
296 if dst_if_index in cls._packet_count_for_dst_if_idx:
297 return cls._packet_count_for_dst_if_idx[dst_if_index]
302 def has_tag(cls, tag):
303 """ if the test case has a given tag - return true """
305 return tag in cls.test_tags
306 except AttributeError:
311 def is_tagged_run_solo(cls):
312 """ if the test case class is timing-sensitive - return true """
313 return cls.has_tag(TestCaseTag.RUN_SOLO)
316 def skip_fixme_asan(cls):
317 """ if @tag_fixme_asan & ASan is enabled - mark for skip """
318 if cls.has_tag(TestCaseTag.FIXME_ASAN):
319 vpp_extra_cmake_args = os.environ.get('VPP_EXTRA_CMAKE_ARGS', '')
320 if 'DVPP_ENABLE_SANITIZE_ADDR=ON' in vpp_extra_cmake_args:
321 cls = unittest.skip("Skipping @tag_fixme_asan tests")(cls)
325 """Return the instance of this testcase"""
326 return cls.test_instance
329 def set_debug_flags(cls, d):
330 cls.gdbserver_port = 7777
331 cls.debug_core = False
332 cls.debug_gdb = False
333 cls.debug_gdbserver = False
334 cls.debug_all = False
335 cls.debug_attach = False
340 cls.debug_core = True
341 elif dl == "gdb" or dl == "gdb-all":
343 elif dl == "gdbserver" or dl == "gdbserver-all":
344 cls.debug_gdbserver = True
346 cls.debug_attach = True
348 raise Exception("Unrecognized DEBUG option: '%s'" % d)
349 if dl == "gdb-all" or dl == "gdbserver-all":
353 def get_vpp_worker_count(cls):
354 if not hasattr(cls, "vpp_worker_count"):
355 if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
356 cls.vpp_worker_count = 0
358 cls.vpp_worker_count = config.vpp_worker_count
359 return cls.vpp_worker_count
362 def get_cpus_required(cls):
363 return 1 + cls.get_vpp_worker_count()
366 def setUpConstants(cls):
367 """ Set-up the test case class based on environment variables """
368 cls.step = config.step
369 cls.plugin_path = ":".join(config.vpp_plugin_dir)
370 cls.test_plugin_path = ":".join(config.vpp_test_plugin_dir)
371 cls.extern_plugin_path = ":".join(config.extern_plugin_dir)
373 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
374 debug_cli = "cli-listen localhost:5002"
375 size = re.search(r"\d+[gG]", config.coredump_size)
377 coredump_size = f"coredump-size {config.coredump_size}".lower()
379 coredump_size = "coredump-size unlimited"
380 default_variant = config.variant
381 if default_variant is not None:
382 default_variant = "defaults { %s 100 }" % default_variant
386 api_fuzzing = config.api_fuzz
387 if api_fuzzing is None:
392 "unix", "{", "nodaemon", debug_cli, "full-coredump",
393 coredump_size, "runtime-dir", cls.tempdir, "}",
394 "api-trace", "{", "on", "}",
395 "api-segment", "{", "prefix", cls.get_api_segment_prefix(), "}",
396 "cpu", "{", "main-core", str(cls.cpus[0]), ]
397 if cls.extern_plugin_path not in (None, ""):
398 cls.extra_vpp_plugin_config.append(
399 "add-path %s" % cls.extern_plugin_path)
400 if cls.get_vpp_worker_count():
401 cls.vpp_cmdline.extend([
402 "corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])])
403 cls.vpp_cmdline.extend([
405 "physmem", "{", "max-size", "32m", "}",
406 "statseg", "{", "socket-name", cls.get_stats_sock_path(),
407 cls.extra_vpp_statseg_config, "}",
408 "socksvr", "{", "socket-name", cls.get_api_sock_path(), "}",
409 "node { ", default_variant, "}",
410 "api-fuzz {", api_fuzzing, "}",
411 "plugins", "{", "plugin", "dpdk_plugin.so", "{", "disable", "}",
412 "plugin", "rdma_plugin.so", "{", "disable", "}",
413 "plugin", "lisp_unittest_plugin.so", "{", "enable", "}",
414 "plugin", "unittest_plugin.so", "{", "enable", "}"
415 ] + cls.extra_vpp_plugin_config + ["}", ])
417 if cls.extra_vpp_punt_config is not None:
418 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
420 if not cls.debug_attach:
421 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
422 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
425 def wait_for_enter(cls):
426 if cls.debug_gdbserver:
427 print(double_line_delim)
428 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
430 print(double_line_delim)
431 print("Spawned VPP with PID: %d" % cls.vpp.pid)
433 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
435 print(single_line_delim)
436 print("You can debug VPP using:")
437 if cls.debug_gdbserver:
438 print(f"sudo gdb {config.vpp} "
439 f"-ex 'target remote localhost:{cls.gdbserver_port}'")
440 print("Now is the time to attach gdb by running the above "
441 "command, set up breakpoints etc., then resume VPP from "
442 "within gdb by issuing the 'continue' command")
443 cls.gdbserver_port += 1
445 print(f"sudo gdb {config.vpp} -ex 'attach {cls.vpp.pid}'")
446 print("Now is the time to attach gdb by running the above "
447 "command and set up breakpoints etc., then resume VPP from"
448 " within gdb by issuing the 'continue' command")
449 print(single_line_delim)
450 input("Press ENTER to continue running the testcase...")
458 cls.logger.debug(f"Assigned cpus: {cls.cpus}")
459 cmdline = cls.vpp_cmdline
461 if cls.debug_gdbserver:
462 gdbserver = '/usr/bin/gdbserver'
463 if not os.path.isfile(gdbserver) or\
464 not os.access(gdbserver, os.X_OK):
465 raise Exception("gdbserver binary '%s' does not exist or is "
466 "not executable" % gdbserver)
468 cmdline = [gdbserver, 'localhost:{port}'
469 .format(port=cls.gdbserver_port)] + cls.vpp_cmdline
470 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
473 cls.vpp = subprocess.Popen(cmdline,
474 stdout=subprocess.PIPE,
475 stderr=subprocess.PIPE)
476 except subprocess.CalledProcessError as e:
477 cls.logger.critical("Subprocess returned with non-0 return code: ("
481 cls.logger.critical("Subprocess returned with OS error: "
482 "(%s) %s", e.errno, e.strerror)
484 except Exception as e:
485 cls.logger.exception("Subprocess returned unexpected from "
492 def wait_for_coredump(cls):
493 corefile = cls.tempdir + "/core"
494 if os.path.isfile(corefile):
495 cls.logger.error("Waiting for coredump to complete: %s", corefile)
496 curr_size = os.path.getsize(corefile)
497 deadline = time.time() + 60
499 while time.time() < deadline:
502 curr_size = os.path.getsize(corefile)
503 if size == curr_size:
507 cls.logger.error("Timed out waiting for coredump to complete:"
510 cls.logger.error("Coredump complete: %s, size %d",
514 def get_stats_sock_path(cls):
515 return "%s/stats.sock" % cls.tempdir
518 def get_api_sock_path(cls):
519 return "%s/api.sock" % cls.tempdir
522 def get_api_segment_prefix(cls):
523 return os.path.basename(cls.tempdir) # Only used for VAPI
526 def get_tempdir(cls):
528 tmpdir = f"{config.tmp_dir}/unittest-attach-gdb"
530 tmpdir = f"{config.tmp_dir}/vpp-unittest-{cls.__name__}"
531 if config.wipe_tmp_dir:
532 shutil.rmtree(tmpdir, ignore_errors=True)
537 def create_file_handler(cls):
538 if config.log_dir is None:
539 cls.file_handler = FileHandler(f"{cls.tempdir}/log.txt")
542 logdir = f"{config.log_dir}/vpp-unittest-{cls.__name__}"
543 if config.wipe_tmp_dir:
544 shutil.rmtree(logdir, ignore_errors=True)
546 cls.file_handler = FileHandler(f"{logdir}/log.txt")
551 Perform class setup before running the testcase
552 Remove shared memory files, start vpp and connect the vpp-api
554 super(VppTestCase, cls).setUpClass()
555 cls.logger = get_logger(cls.__name__)
556 random.seed(config.rnd_seed)
557 if hasattr(cls, 'parallel_handler'):
558 cls.logger.addHandler(cls.parallel_handler)
559 cls.logger.propagate = False
560 cls.set_debug_flags(config.debug)
561 cls.tempdir = cls.get_tempdir()
562 cls.create_file_handler()
563 cls.file_handler.setFormatter(
564 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
566 cls.file_handler.setLevel(DEBUG)
567 cls.logger.addHandler(cls.file_handler)
568 cls.logger.debug("--- setUpClass() for %s called ---" % cls.__name__)
569 os.chdir(cls.tempdir)
570 cls.logger.info("Temporary dir is %s, api socket is %s",
571 cls.tempdir, cls.get_api_sock_path())
572 cls.logger.debug("Random seed is %s", config.rnd_seed)
574 cls.reset_packet_infos()
579 cls.registry = VppObjectRegistry()
580 cls.vpp_startup_failed = False
581 cls.reporter = KeepAliveReporter()
582 # need to catch exceptions here because if we raise, then the cleanup
583 # doesn't get called and we might end with a zombie vpp
589 cls.reporter.send_keep_alive(cls, 'setUpClass')
590 VppTestResult.current_test_case_info = TestCaseInfo(
591 cls.logger, cls.tempdir, cls.vpp.pid, config.vpp)
592 cls.vpp_stdout_deque = deque()
593 cls.vpp_stderr_deque = deque()
594 if not cls.debug_attach:
595 cls.pump_thread_stop_flag = Event()
596 cls.pump_thread_wakeup_pipe = os.pipe()
597 cls.pump_thread = Thread(target=pump_output, args=(cls,))
598 cls.pump_thread.daemon = True
599 cls.pump_thread.start()
600 if cls.debug_gdb or cls.debug_gdbserver or cls.debug_attach:
601 cls.vapi_response_timeout = 0
602 cls.vapi = VppPapiProvider(cls.__name__, cls,
603 cls.vapi_response_timeout)
605 hook = hookmodule.StepHook(cls)
607 hook = hookmodule.PollHook(cls)
608 cls.vapi.register_hook(hook)
609 cls.statistics = VPPStats(socketname=cls.get_stats_sock_path())
613 cls.vpp_startup_failed = True
615 "VPP died shortly after startup, check the"
616 " output to standard error for possible cause")
620 except (vpp_papi.VPPIOError, Exception) as e:
621 cls.logger.debug("Exception connecting to vapi: %s" % e)
622 cls.vapi.disconnect()
624 if cls.debug_gdbserver:
625 print(colorize("You're running VPP inside gdbserver but "
626 "VPP-API connection failed, did you forget "
627 "to 'continue' VPP from within gdb?", RED))
630 last_line = cls.vapi.cli("show thread").split("\n")[-2]
631 cls.vpp_worker_count = int(last_line.split(" ")[0])
632 print("Detected VPP with %s workers." % cls.vpp_worker_count)
633 except vpp_papi.VPPRuntimeError as e:
634 cls.logger.debug("%s" % e)
637 except Exception as e:
638 cls.logger.debug("Exception connecting to VPP: %s" % e)
643 def _debug_quit(cls):
644 if (cls.debug_gdbserver or cls.debug_gdb):
648 if cls.vpp.returncode is None:
650 print(double_line_delim)
651 print("VPP or GDB server is still running")
652 print(single_line_delim)
653 input("When done debugging, press ENTER to kill the "
654 "process and finish running the testcase...")
655 except AttributeError:
661 Disconnect vpp-api, kill vpp and cleanup shared memory files
665 # first signal that we want to stop the pump thread, then wake it up
666 if hasattr(cls, 'pump_thread_stop_flag'):
667 cls.pump_thread_stop_flag.set()
668 if hasattr(cls, 'pump_thread_wakeup_pipe'):
669 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
670 if hasattr(cls, 'pump_thread'):
671 cls.logger.debug("Waiting for pump thread to stop")
672 cls.pump_thread.join()
673 if hasattr(cls, 'vpp_stderr_reader_thread'):
674 cls.logger.debug("Waiting for stderr pump to stop")
675 cls.vpp_stderr_reader_thread.join()
677 if hasattr(cls, 'vpp'):
678 if hasattr(cls, 'vapi'):
679 cls.logger.debug(cls.vapi.vpp.get_stats())
680 cls.logger.debug("Disconnecting class vapi client on %s",
682 cls.vapi.disconnect()
683 cls.logger.debug("Deleting class vapi attribute on %s",
687 if not cls.debug_attach and cls.vpp.returncode is None:
688 cls.wait_for_coredump()
689 cls.logger.debug("Sending TERM to vpp")
691 cls.logger.debug("Waiting for vpp to die")
693 outs, errs = cls.vpp.communicate(timeout=5)
694 except subprocess.TimeoutExpired:
696 outs, errs = cls.vpp.communicate()
697 cls.logger.debug("Deleting class vpp attribute on %s",
699 if not cls.debug_attach:
700 cls.vpp.stdout.close()
701 cls.vpp.stderr.close()
704 if cls.vpp_startup_failed:
705 stdout_log = cls.logger.info
706 stderr_log = cls.logger.critical
708 stdout_log = cls.logger.info
709 stderr_log = cls.logger.info
711 if hasattr(cls, 'vpp_stdout_deque'):
712 stdout_log(single_line_delim)
713 stdout_log('VPP output to stdout while running %s:', cls.__name__)
714 stdout_log(single_line_delim)
715 vpp_output = "".join(cls.vpp_stdout_deque)
716 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
718 stdout_log('\n%s', vpp_output)
719 stdout_log(single_line_delim)
721 if hasattr(cls, 'vpp_stderr_deque'):
722 stderr_log(single_line_delim)
723 stderr_log('VPP output to stderr while running %s:', cls.__name__)
724 stderr_log(single_line_delim)
725 vpp_output = "".join(cls.vpp_stderr_deque)
726 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
728 stderr_log('\n%s', vpp_output)
729 stderr_log(single_line_delim)
732 def tearDownClass(cls):
733 """ Perform final cleanup after running all tests in this test-case """
734 cls.logger.debug("--- tearDownClass() for %s called ---" %
736 cls.reporter.send_keep_alive(cls, 'tearDownClass')
738 cls.file_handler.close()
739 cls.reset_packet_infos()
740 if config.debug_framework:
741 debug_internal.on_tear_down_class(cls)
743 def show_commands_at_teardown(self):
744 """ Allow subclass specific teardown logging additions."""
745 self.logger.info("--- No test specific show commands provided. ---")
748 """ Show various debug prints after each test """
749 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
750 (self.__class__.__name__, self._testMethodName,
751 self._testMethodDoc))
754 if not self.vpp_dead:
755 self.logger.debug(self.vapi.cli("show trace max 1000"))
756 self.logger.info(self.vapi.ppcli("show interface"))
757 self.logger.info(self.vapi.ppcli("show hardware"))
758 self.logger.info(self.statistics.set_errors_str())
759 self.logger.info(self.vapi.ppcli("show run"))
760 self.logger.info(self.vapi.ppcli("show log"))
761 self.logger.info(self.vapi.ppcli("show bihash"))
762 self.logger.info("Logging testcase specific show commands.")
763 self.show_commands_at_teardown()
764 if self.remove_configured_vpp_objects_on_tear_down:
765 self.registry.remove_vpp_config(self.logger)
766 # Save/Dump VPP api trace log
767 m = self._testMethodName
768 api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
769 tmp_api_trace = "/tmp/%s" % api_trace
770 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
771 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
772 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
774 os.rename(tmp_api_trace, vpp_api_trace_log)
775 except VppTransportSocketIOError:
776 self.logger.debug("VppTransportSocketIOError: Vpp dead. "
777 "Cannot log show commands.")
780 self.registry.unregister_all(self.logger)
783 """ Clear trace before running each test"""
784 super(VppTestCase, self).setUp()
785 self.reporter.send_keep_alive(self)
787 raise VppDiedError(rv=None, testcase=self.__class__.__name__,
788 method_name=self._testMethodName)
789 self.sleep(.1, "during setUp")
790 self.vpp_stdout_deque.append(
791 "--- test setUp() for %s.%s(%s) starts here ---\n" %
792 (self.__class__.__name__, self._testMethodName,
793 self._testMethodDoc))
794 self.vpp_stderr_deque.append(
795 "--- test setUp() for %s.%s(%s) starts here ---\n" %
796 (self.__class__.__name__, self._testMethodName,
797 self._testMethodDoc))
798 self.vapi.cli("clear trace")
799 # store the test instance inside the test class - so that objects
800 # holding the class can access instance methods (like assertEqual)
801 type(self).test_instance = self
804 def pg_enable_capture(cls, interfaces=None):
806 Enable capture on packet-generator interfaces
808 :param interfaces: iterable interface indexes (if None,
809 use self.pg_interfaces)
812 if interfaces is None:
813 interfaces = cls.pg_interfaces
818 def register_pcap(cls, intf, worker):
819 """ Register a pcap in the testclass """
820 # add to the list of captures with current timestamp
821 cls._pcaps.append((intf, worker))
824 def get_vpp_time(cls):
825 # processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT"
826 # returns float("2.190522")
827 timestr = cls.vapi.cli('show clock')
828 head, sep, tail = timestr.partition(',')
829 head, sep, tail = head.partition('Time now')
833 def sleep_on_vpp_time(cls, sec):
834 """ Sleep according to time in VPP world """
835 # On a busy system with many processes
836 # we might end up with VPP time being slower than real world
837 # So take that into account when waiting for VPP to do something
838 start_time = cls.get_vpp_time()
839 while cls.get_vpp_time() - start_time < sec:
843 def pg_start(cls, trace=True):
844 """ Enable the PG, wait till it is done, then clean up """
845 for (intf, worker) in cls._old_pcaps:
846 intf.handle_old_pcap_file(intf.get_in_path(worker),
847 intf.in_history_counter)
850 cls.vapi.cli("clear trace")
851 cls.vapi.cli("trace add pg-input 1000")
852 cls.vapi.cli('packet-generator enable')
853 # PG, when starts, runs to completion -
854 # so let's avoid a race condition,
855 # and wait a little till it's done.
856 # Then clean it up - and then be gone.
857 deadline = time.time() + 300
858 while cls.vapi.cli('show packet-generator').find("Yes") != -1:
859 cls.sleep(0.01) # yield
860 if time.time() > deadline:
861 cls.logger.error("Timeout waiting for pg to stop")
863 for intf, worker in cls._pcaps:
864 cls.vapi.cli('packet-generator delete %s' %
865 intf.get_cap_name(worker))
866 cls._old_pcaps = cls._pcaps
870 def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0,
873 Create packet-generator interfaces.
875 :param interfaces: iterable indexes of the interfaces.
876 :returns: List of created interfaces.
881 intf = VppPGInterface(cls, i, gso, gso_size, mode)
882 setattr(cls, intf.name, intf)
884 cls.pg_interfaces = result
888 def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
889 pgmode = VppEnum.vl_api_pg_interface_mode_t
890 return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
891 pgmode.PG_API_MODE_IP4)
894 def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
895 pgmode = VppEnum.vl_api_pg_interface_mode_t
896 return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
897 pgmode.PG_API_MODE_IP6)
900 def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
901 pgmode = VppEnum.vl_api_pg_interface_mode_t
902 return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
903 pgmode.PG_API_MODE_ETHERNET)
906 def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
907 pgmode = VppEnum.vl_api_pg_interface_mode_t
908 return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
909 pgmode.PG_API_MODE_ETHERNET)
912 def create_loopback_interfaces(cls, count):
914 Create loopback interfaces.
916 :param count: number of interfaces created.
917 :returns: List of created interfaces.
919 result = [VppLoInterface(cls) for i in range(count)]
921 setattr(cls, intf.name, intf)
922 cls.lo_interfaces = result
926 def create_bvi_interfaces(cls, count):
928 Create BVI interfaces.
930 :param count: number of interfaces created.
931 :returns: List of created interfaces.
933 result = [VppBviInterface(cls) for i in range(count)]
935 setattr(cls, intf.name, intf)
936 cls.bvi_interfaces = result
940 def extend_packet(packet, size, padding=' '):
942 Extend packet to given size by padding with spaces or custom padding
943 NOTE: Currently works only when Raw layer is present.
945 :param packet: packet
946 :param size: target size
947 :param padding: padding used to extend the payload
950 packet_len = len(packet) + 4
951 extend = size - packet_len
953 num = (extend // len(padding)) + 1
954 packet[Raw].load += (padding * num)[:extend].encode("ascii")
957 def reset_packet_infos(cls):
958 """ Reset the list of packet info objects and packet counts to zero """
959 cls._packet_infos = {}
960 cls._packet_count_for_dst_if_idx = {}
963 def create_packet_info(cls, src_if, dst_if):
965 Create packet info object containing the source and destination indexes
966 and add it to the testcase's packet info list
968 :param VppInterface src_if: source interface
969 :param VppInterface dst_if: destination interface
971 :returns: _PacketInfo object
975 info.index = len(cls._packet_infos)
976 info.src = src_if.sw_if_index
977 info.dst = dst_if.sw_if_index
978 if isinstance(dst_if, VppSubInterface):
979 dst_idx = dst_if.parent.sw_if_index
981 dst_idx = dst_if.sw_if_index
982 if dst_idx in cls._packet_count_for_dst_if_idx:
983 cls._packet_count_for_dst_if_idx[dst_idx] += 1
985 cls._packet_count_for_dst_if_idx[dst_idx] = 1
986 cls._packet_infos[info.index] = info
990 def info_to_payload(info):
992 Convert _PacketInfo object to packet payload
994 :param info: _PacketInfo object
996 :returns: string containing serialized data from packet info
999 # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1000 return pack('iiiih', info.index, info.src,
1001 info.dst, info.ip, info.proto)
1004 def payload_to_info(payload, payload_field='load'):
1006 Convert packet payload to _PacketInfo object
1008 :param payload: packet payload
1009 :type payload: <class 'scapy.packet.Raw'>
1010 :param payload_field: packet fieldname of payload "load" for
1011 <class 'scapy.packet.Raw'>
1012 :type payload_field: str
1013 :returns: _PacketInfo object containing de-serialized data from payload
1017 # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1018 payload_b = getattr(payload, payload_field)[:18]
1020 info = _PacketInfo()
1021 info.index, info.src, info.dst, info.ip, info.proto \
1022 = unpack('iiiih', payload_b)
1024 # some SRv6 TCs depend on get an exception if bad values are detected
1025 if info.index > 0x4000:
1026 raise ValueError('Index value is invalid')
1030 def get_next_packet_info(self, info):
1032 Iterate over the packet info list stored in the testcase
1033 Start iteration with first element if info is None
1034 Continue based on index in info if info is specified
1036 :param info: info or None
1037 :returns: next info in list or None if no more infos
1042 next_index = info.index + 1
1043 if next_index == len(self._packet_infos):
1046 return self._packet_infos[next_index]
1048 def get_next_packet_info_for_interface(self, src_index, info):
1050 Search the packet info list for the next packet info with same source
1053 :param src_index: source interface index to search for
1054 :param info: packet info - where to start the search
1055 :returns: packet info or None
1059 info = self.get_next_packet_info(info)
1062 if info.src == src_index:
1065 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
1067 Search the packet info list for the next packet info with same source
1068 and destination interface indexes
1070 :param src_index: source interface index to search for
1071 :param dst_index: destination interface index to search for
1072 :param info: packet info - where to start the search
1073 :returns: packet info or None
1077 info = self.get_next_packet_info_for_interface(src_index, info)
1080 if info.dst == dst_index:
1083 def assert_equal(self, real_value, expected_value, name_or_class=None):
1084 if name_or_class is None:
1085 self.assertEqual(real_value, expected_value)
1088 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
1089 msg = msg % (getdoc(name_or_class).strip(),
1090 real_value, str(name_or_class(real_value)),
1091 expected_value, str(name_or_class(expected_value)))
1093 msg = "Invalid %s: %s does not match expected value %s" % (
1094 name_or_class, real_value, expected_value)
1096 self.assertEqual(real_value, expected_value, msg)
1098 def assert_in_range(self,
1106 msg = "Invalid %s: %s out of range <%s,%s>" % (
1107 name, real_value, expected_min, expected_max)
1108 self.assertTrue(expected_min <= real_value <= expected_max, msg)
1110 def assert_packet_checksums_valid(self, packet,
1111 ignore_zero_udp_checksums=True):
1112 received = packet.__class__(scapy.compat.raw(packet))
1113 udp_layers = ['UDP', 'UDPerror']
1114 checksum_fields = ['cksum', 'chksum']
1117 temp = received.__class__(scapy.compat.raw(received))
1119 layer = temp.getlayer(counter)
1121 layer = layer.copy()
1122 layer.remove_payload()
1123 for cf in checksum_fields:
1124 if hasattr(layer, cf):
1125 if ignore_zero_udp_checksums and \
1126 0 == getattr(layer, cf) and \
1127 layer.name in udp_layers:
1129 delattr(temp.getlayer(counter), cf)
1130 checksums.append((counter, cf))
1133 counter = counter + 1
1134 if 0 == len(checksums):
1136 temp = temp.__class__(scapy.compat.raw(temp))
1137 for layer, cf in checksums:
1138 calc_sum = getattr(temp[layer], cf)
1140 getattr(received[layer], cf), calc_sum,
1141 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
1143 "Checksum field `%s` on `%s` layer has correct value `%s`" %
1144 (cf, temp[layer].name, calc_sum))
1146 def assert_checksum_valid(self, received_packet, layer,
1147 field_name='chksum',
1148 ignore_zero_checksum=False):
1149 """ Check checksum of received packet on given layer """
1150 received_packet_checksum = getattr(received_packet[layer], field_name)
1151 if ignore_zero_checksum and 0 == received_packet_checksum:
1153 recalculated = received_packet.__class__(
1154 scapy.compat.raw(received_packet))
1155 delattr(recalculated[layer], field_name)
1156 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1157 self.assert_equal(received_packet_checksum,
1158 getattr(recalculated[layer], field_name),
1159 "packet checksum on layer: %s" % layer)
1161 def assert_ip_checksum_valid(self, received_packet,
1162 ignore_zero_checksum=False):
1163 self.assert_checksum_valid(received_packet, 'IP',
1164 ignore_zero_checksum=ignore_zero_checksum)
1166 def assert_tcp_checksum_valid(self, received_packet,
1167 ignore_zero_checksum=False):
1168 self.assert_checksum_valid(received_packet, 'TCP',
1169 ignore_zero_checksum=ignore_zero_checksum)
1171 def assert_udp_checksum_valid(self, received_packet,
1172 ignore_zero_checksum=True):
1173 self.assert_checksum_valid(received_packet, 'UDP',
1174 ignore_zero_checksum=ignore_zero_checksum)
1176 def assert_embedded_icmp_checksum_valid(self, received_packet):
1177 if received_packet.haslayer(IPerror):
1178 self.assert_checksum_valid(received_packet, 'IPerror')
1179 if received_packet.haslayer(TCPerror):
1180 self.assert_checksum_valid(received_packet, 'TCPerror')
1181 if received_packet.haslayer(UDPerror):
1182 self.assert_checksum_valid(received_packet, 'UDPerror',
1183 ignore_zero_checksum=True)
1184 if received_packet.haslayer(ICMPerror):
1185 self.assert_checksum_valid(received_packet, 'ICMPerror')
1187 def assert_icmp_checksum_valid(self, received_packet):
1188 self.assert_checksum_valid(received_packet, 'ICMP')
1189 self.assert_embedded_icmp_checksum_valid(received_packet)
1191 def assert_icmpv6_checksum_valid(self, pkt):
1192 if pkt.haslayer(ICMPv6DestUnreach):
1193 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
1194 self.assert_embedded_icmp_checksum_valid(pkt)
1195 if pkt.haslayer(ICMPv6EchoRequest):
1196 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
1197 if pkt.haslayer(ICMPv6EchoReply):
1198 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
1200 def get_counter(self, counter):
1201 if counter.startswith("/"):
1202 counter_value = self.statistics.get_counter(counter)
1204 counters = self.vapi.cli("sh errors").split('\n')
1206 for i in range(1, len(counters) - 1):
1207 results = counters[i].split()
1208 if results[1] == counter:
1209 counter_value = int(results[0])
1211 return counter_value
1213 def assert_counter_equal(self, counter, expected_value,
1214 thread=None, index=0):
1215 c = self.get_counter(counter)
1216 if thread is not None:
1217 c = c[thread][index]
1219 c = sum(x[index] for x in c)
1220 self.assert_equal(c, expected_value, "counter `%s'" % counter)
1222 def assert_packet_counter_equal(self, counter, expected_value):
1223 counter_value = self.get_counter(counter)
1224 self.assert_equal(counter_value, expected_value,
1225 "packet counter `%s'" % counter)
1227 def assert_error_counter_equal(self, counter, expected_value):
1228 counter_value = self.statistics[counter].sum()
1229 self.assert_equal(counter_value, expected_value,
1230 "error counter `%s'" % counter)
1233 def sleep(cls, timeout, remark=None):
1235 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1236 # * by Guido, only the main thread can be interrupted.
1238 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1241 if hasattr(os, 'sched_yield'):
1247 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1248 before = time.time()
1251 if after - before > 2 * timeout:
1252 cls.logger.error("unexpected self.sleep() result - "
1253 "slept for %es instead of ~%es!",
1254 after - before, timeout)
1257 "Finished sleep (%s) - slept %es (wanted %es)",
1258 remark, after - before, timeout)
1260 def virtual_sleep(self, timeout, remark=None):
1261 self.logger.debug("Moving VPP time by %s (%s)", timeout, remark)
1262 self.vapi.cli("set clock adjust %s" % timeout)
1264 def pg_send(self, intf, pkts, worker=None, trace=True):
1265 intf.add_stream(pkts, worker=worker)
1266 self.pg_enable_capture(self.pg_interfaces)
1267 self.pg_start(trace=trace)
1269 def snapshot_stats(self, stats_diff):
1270 """Return snapshot of interesting stats based on diff dictionary."""
1272 for sw_if_index in stats_diff:
1273 for counter in stats_diff[sw_if_index]:
1274 stats_snapshot[counter] = self.statistics[counter]
1275 self.logger.debug(f"Took statistics stats_snapshot: {stats_snapshot}")
1276 return stats_snapshot
1278 def compare_stats_with_snapshot(self, stats_diff, stats_snapshot):
1279 """Assert appropriate difference between current stats and snapshot."""
1280 for sw_if_index in stats_diff:
1281 for cntr, diff in stats_diff[sw_if_index].items():
1282 if sw_if_index == "err":
1284 self.statistics[cntr].sum(),
1285 stats_snapshot[cntr].sum() + diff,
1286 f"'{cntr}' counter value (previous value: "
1287 f"{stats_snapshot[cntr].sum()}, "
1288 f"expected diff: {diff})")
1292 self.statistics[cntr][:, sw_if_index].sum(),
1293 stats_snapshot[cntr][:, sw_if_index].sum() + diff,
1294 f"'{cntr}' counter value (previous value: "
1295 f"{stats_snapshot[cntr][:, sw_if_index].sum()}, "
1296 f"expected diff: {diff})")
1298 # if diff is 0, then this most probably a case where
1299 # test declares multiple interfaces but traffic hasn't
1300 # passed through this one yet - which means the counter
1301 # value is 0 and can be ignored
1305 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None,
1306 stats_diff=None, trace=True, msg=None):
1308 stats_snapshot = self.snapshot_stats(stats_diff)
1310 self.pg_send(intf, pkts)
1315 for i in self.pg_interfaces:
1316 i.assert_nothing_captured(timeout=timeout, remark=remark)
1321 self.logger.debug(f"send_and_assert_no_replies: {msg}")
1322 self.logger.debug(self.vapi.cli("show trace"))
1325 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1327 def send_and_expect(self, intf, pkts, output, n_rx=None, worker=None,
1328 trace=True, msg=None, stats_diff=None):
1330 stats_snapshot = self.snapshot_stats(stats_diff)
1333 n_rx = 1 if isinstance(pkts, Packet) else len(pkts)
1334 self.pg_send(intf, pkts, worker=worker, trace=trace)
1335 rx = output.get_capture(n_rx)
1338 self.logger.debug(f"send_and_expect: {msg}")
1339 self.logger.debug(self.vapi.cli("show trace"))
1342 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1346 def send_and_expect_load_balancing(self, input, pkts, outputs,
1347 worker=None, trace=True):
1348 self.pg_send(input, pkts, worker=worker, trace=trace)
1351 rx = oo._get_capture(1)
1352 self.assertNotEqual(0, len(rx))
1355 self.logger.debug(self.vapi.cli("show trace"))
1358 def send_and_expect_some(self, intf, pkts, output,
1361 self.pg_send(intf, pkts, worker=worker, trace=trace)
1362 rx = output._get_capture(1)
1364 self.logger.debug(self.vapi.cli("show trace"))
1365 self.assertTrue(len(rx) > 0)
1366 self.assertTrue(len(rx) < len(pkts))
1369 def send_and_expect_only(self, intf, pkts, output, timeout=None,
1372 stats_snapshot = self.snapshot_stats(stats_diff)
1374 self.pg_send(intf, pkts)
1375 rx = output.get_capture(len(pkts))
1379 for i in self.pg_interfaces:
1380 if i not in outputs:
1381 i.assert_nothing_captured(timeout=timeout)
1385 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1390 def get_testcase_doc_name(test):
1391 return getdoc(test.__class__).splitlines()[0]
1394 def get_test_description(descriptions, test):
1395 short_description = test.shortDescription()
1396 if descriptions and short_description:
1397 return short_description
1402 class TestCaseInfo(object):
1403 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1404 self.logger = logger
1405 self.tempdir = tempdir
1406 self.vpp_pid = vpp_pid
1407 self.vpp_bin_path = vpp_bin_path
1408 self.core_crash_test = None
1411 class VppTestResult(unittest.TestResult):
1413 @property result_string
1414 String variable to store the test case result string.
1416 List variable containing 2-tuples of TestCase instances and strings
1417 holding formatted tracebacks. Each tuple represents a test which
1418 raised an unexpected exception.
1420 List variable containing 2-tuples of TestCase instances and strings
1421 holding formatted tracebacks. Each tuple represents a test where
1422 a failure was explicitly signalled using the TestCase.assert*()
1426 failed_test_cases_info = set()
1427 core_crash_test_cases_info = set()
1428 current_test_case_info = None
1430 def __init__(self, stream=None, descriptions=None, verbosity=None,
1433 :param stream File descriptor to store where to report test results.
1434 Set to the standard error stream by default.
1435 :param descriptions Boolean variable to store information if to use
1436 test case descriptions.
1437 :param verbosity Integer variable to store required verbosity level.
1439 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1440 self.stream = stream
1441 self.descriptions = descriptions
1442 self.verbosity = verbosity
1443 self.result_string = None
1444 self.runner = runner
1447 def addSuccess(self, test):
1449 Record a test succeeded result
1454 if self.current_test_case_info:
1455 self.current_test_case_info.logger.debug(
1456 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1457 test._testMethodName,
1458 test._testMethodDoc))
1459 unittest.TestResult.addSuccess(self, test)
1460 self.result_string = colorize("OK", GREEN)
1462 self.send_result_through_pipe(test, PASS)
1464 def addSkip(self, test, reason):
1466 Record a test skipped.
1472 if self.current_test_case_info:
1473 self.current_test_case_info.logger.debug(
1474 "--- addSkip() %s.%s(%s) called, reason is %s" %
1475 (test.__class__.__name__, test._testMethodName,
1476 test._testMethodDoc, reason))
1477 unittest.TestResult.addSkip(self, test, reason)
1478 self.result_string = colorize("SKIP", YELLOW)
1480 if reason == "not enough cpus":
1481 self.send_result_through_pipe(test, SKIP_CPU_SHORTAGE)
1483 self.send_result_through_pipe(test, SKIP)
1485 def symlink_failed(self):
1486 if self.current_test_case_info:
1488 failed_dir = config.failed_dir
1489 link_path = os.path.join(
1492 os.path.basename(self.current_test_case_info.tempdir))
1494 self.current_test_case_info.logger.debug(
1495 "creating a link to the failed test")
1496 self.current_test_case_info.logger.debug(
1497 "os.symlink(%s, %s)" %
1498 (self.current_test_case_info.tempdir, link_path))
1499 if os.path.exists(link_path):
1500 self.current_test_case_info.logger.debug(
1501 'symlink already exists')
1503 os.symlink(self.current_test_case_info.tempdir, link_path)
1505 except Exception as e:
1506 self.current_test_case_info.logger.error(e)
1508 def send_result_through_pipe(self, test, result):
1509 if hasattr(self, 'test_framework_result_pipe'):
1510 pipe = self.test_framework_result_pipe
1512 pipe.send((test.id(), result))
1514 def log_error(self, test, err, fn_name):
1515 if self.current_test_case_info:
1516 if isinstance(test, unittest.suite._ErrorHolder):
1517 test_name = test.description
1519 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1520 test._testMethodName,
1521 test._testMethodDoc)
1522 self.current_test_case_info.logger.debug(
1523 "--- %s() %s called, err is %s" %
1524 (fn_name, test_name, err))
1525 self.current_test_case_info.logger.debug(
1526 "formatted exception is:\n%s" %
1527 "".join(format_exception(*err)))
1529 def add_error(self, test, err, unittest_fn, error_type):
1530 if error_type == FAIL:
1531 self.log_error(test, err, 'addFailure')
1532 error_type_str = colorize("FAIL", RED)
1533 elif error_type == ERROR:
1534 self.log_error(test, err, 'addError')
1535 error_type_str = colorize("ERROR", RED)
1537 raise Exception('Error type %s cannot be used to record an '
1538 'error or a failure' % error_type)
1540 unittest_fn(self, test, err)
1541 if self.current_test_case_info:
1542 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1544 self.current_test_case_info.tempdir)
1545 self.symlink_failed()
1546 self.failed_test_cases_info.add(self.current_test_case_info)
1547 if is_core_present(self.current_test_case_info.tempdir):
1548 if not self.current_test_case_info.core_crash_test:
1549 if isinstance(test, unittest.suite._ErrorHolder):
1550 test_name = str(test)
1552 test_name = "'{!s}' ({!s})".format(
1553 get_testcase_doc_name(test), test.id())
1554 self.current_test_case_info.core_crash_test = test_name
1555 self.core_crash_test_cases_info.add(
1556 self.current_test_case_info)
1558 self.result_string = '%s [no temp dir]' % error_type_str
1560 self.send_result_through_pipe(test, error_type)
1562 def addFailure(self, test, err):
1564 Record a test failed result
1567 :param err: error message
1570 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1572 def addError(self, test, err):
1574 Record a test error result
1577 :param err: error message
1580 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1582 def getDescription(self, test):
1584 Get test description
1587 :returns: test description
1590 return get_test_description(self.descriptions, test)
1592 def startTest(self, test):
1600 def print_header(test):
1601 if test.__class__ in self.printed:
1604 test_doc = getdoc(test)
1606 raise Exception("No doc string for test '%s'" % test.id())
1608 test_title = test_doc.splitlines()[0].rstrip()
1609 test_title = colorize(test_title, GREEN)
1610 if test.is_tagged_run_solo():
1611 test_title = colorize(f"SOLO RUN: {test_title}", YELLOW)
1613 # This block may overwrite the colorized title above,
1614 # but we want this to stand out and be fixed
1615 if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
1616 test_title = colorize(
1617 f"FIXME with VPP workers: {test_title}", RED)
1619 if test.has_tag(TestCaseTag.FIXME_ASAN):
1620 test_title = colorize(
1621 f"FIXME with ASAN: {test_title}", RED)
1622 test.skip_fixme_asan()
1624 if hasattr(test, 'vpp_worker_count'):
1625 if test.vpp_worker_count == 0:
1626 test_title += " [main thread only]"
1627 elif test.vpp_worker_count == 1:
1628 test_title += " [1 worker thread]"
1630 test_title += f" [{test.vpp_worker_count} worker threads]"
1632 if test.__class__.skipped_due_to_cpu_lack:
1633 test_title = colorize(
1634 f"{test_title} [skipped - not enough cpus, "
1635 f"required={test.__class__.get_cpus_required()}, "
1636 f"available={max_vpp_cpus}]", YELLOW)
1638 print(double_line_delim)
1640 print(double_line_delim)
1641 self.printed.append(test.__class__)
1644 self.start_test = time.time()
1645 unittest.TestResult.startTest(self, test)
1646 if self.verbosity > 0:
1647 self.stream.writeln(
1648 "Starting " + self.getDescription(test) + " ...")
1649 self.stream.writeln(single_line_delim)
1651 def stopTest(self, test):
1653 Called when the given test has been run
1658 unittest.TestResult.stopTest(self, test)
1660 if self.verbosity > 0:
1661 self.stream.writeln(single_line_delim)
1662 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1663 self.result_string))
1664 self.stream.writeln(single_line_delim)
1666 self.stream.writeln("%-68s %4.2f %s" %
1667 (self.getDescription(test),
1668 time.time() - self.start_test,
1669 self.result_string))
1671 self.send_result_through_pipe(test, TEST_RUN)
1673 def printErrors(self):
1675 Print errors from running the test case
1677 if len(self.errors) > 0 or len(self.failures) > 0:
1678 self.stream.writeln()
1679 self.printErrorList('ERROR', self.errors)
1680 self.printErrorList('FAIL', self.failures)
1682 # ^^ that is the last output from unittest before summary
1683 if not self.runner.print_summary:
1684 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1685 self.stream = devnull
1686 self.runner.stream = devnull
1688 def printErrorList(self, flavour, errors):
1690 Print error list to the output stream together with error type
1691 and test case description.
1693 :param flavour: error type
1694 :param errors: iterable errors
1697 for test, err in errors:
1698 self.stream.writeln(double_line_delim)
1699 self.stream.writeln("%s: %s" %
1700 (flavour, self.getDescription(test)))
1701 self.stream.writeln(single_line_delim)
1702 self.stream.writeln("%s" % err)
1705 class VppTestRunner(unittest.TextTestRunner):
1707 A basic test runner implementation which prints results to standard error.
1711 def resultclass(self):
1712 """Class maintaining the results of the tests"""
1713 return VppTestResult
1715 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1716 result_pipe=None, failfast=False, buffer=False,
1717 resultclass=None, print_summary=True, **kwargs):
1718 # ignore stream setting here, use hard-coded stdout to be in sync
1719 # with prints from VppTestCase methods ...
1720 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1721 verbosity, failfast, buffer,
1722 resultclass, **kwargs)
1723 KeepAliveReporter.pipe = keep_alive_pipe
1725 self.orig_stream = self.stream
1726 self.resultclass.test_framework_result_pipe = result_pipe
1728 self.print_summary = print_summary
1730 def _makeResult(self):
1731 return self.resultclass(self.stream,
1736 def run(self, test):
1743 faulthandler.enable() # emit stack trace to stderr if killed by signal
1745 result = super(VppTestRunner, self).run(test)
1746 if not self.print_summary:
1747 self.stream = self.orig_stream
1748 result.stream = self.orig_stream
1752 class Worker(Thread):
1753 def __init__(self, executable_args, logger, env=None, *args, **kwargs):
1754 super(Worker, self).__init__(*args, **kwargs)
1755 self.logger = logger
1756 self.args = executable_args
1757 if hasattr(self, 'testcase') and self.testcase.debug_all:
1758 if self.testcase.debug_gdbserver:
1759 self.args = ['/usr/bin/gdbserver', 'localhost:{port}'
1760 .format(port=self.testcase.gdbserver_port)] + args
1761 elif self.testcase.debug_gdb and hasattr(self, 'wait_for_gdb'):
1762 self.args.append(self.wait_for_gdb)
1763 self.app_bin = executable_args[0]
1764 self.app_name = os.path.basename(self.app_bin)
1765 if hasattr(self, 'role'):
1766 self.app_name += ' {role}'.format(role=self.role)
1769 env = {} if env is None else env
1770 self.env = copy.deepcopy(env)
1772 def wait_for_enter(self):
1773 if not hasattr(self, 'testcase'):
1775 if self.testcase.debug_all and self.testcase.debug_gdbserver:
1777 print(double_line_delim)
1778 print("Spawned GDB Server for '{app}' with PID: {pid}"
1779 .format(app=self.app_name, pid=self.process.pid))
1780 elif self.testcase.debug_all and self.testcase.debug_gdb:
1782 print(double_line_delim)
1783 print("Spawned '{app}' with PID: {pid}"
1784 .format(app=self.app_name, pid=self.process.pid))
1787 print(single_line_delim)
1788 print("You can debug '{app}' using:".format(app=self.app_name))
1789 if self.testcase.debug_gdbserver:
1790 print("sudo gdb " + self.app_bin +
1791 " -ex 'target remote localhost:{port}'"
1792 .format(port=self.testcase.gdbserver_port))
1793 print("Now is the time to attach gdb by running the above "
1794 "command, set up breakpoints etc., then resume from "
1795 "within gdb by issuing the 'continue' command")
1796 self.testcase.gdbserver_port += 1
1797 elif self.testcase.debug_gdb:
1798 print("sudo gdb " + self.app_bin +
1799 " -ex 'attach {pid}'".format(pid=self.process.pid))
1800 print("Now is the time to attach gdb by running the above "
1801 "command and set up breakpoints etc., then resume from"
1802 " within gdb by issuing the 'continue' command")
1803 print(single_line_delim)
1804 input("Press ENTER to continue running the testcase...")
1807 executable = self.args[0]
1808 if not os.path.exists(executable) or not os.access(
1809 executable, os.F_OK | os.X_OK):
1810 # Exit code that means some system file did not exist,
1811 # could not be opened, or had some other kind of error.
1812 self.result = os.EX_OSFILE
1813 raise EnvironmentError(
1814 "executable '%s' is not found or executable." % executable)
1815 self.logger.debug("Running executable '{app}': '{cmd}'"
1816 .format(app=self.app_name,
1817 cmd=' '.join(self.args)))
1818 env = os.environ.copy()
1819 env.update(self.env)
1820 env["CK_LOG_FILE_NAME"] = "-"
1821 self.process = subprocess.Popen(
1822 ['stdbuf', '-o0', '-e0'] + self.args, shell=False, env=env,
1823 preexec_fn=os.setpgrp, stdout=subprocess.PIPE,
1824 stderr=subprocess.PIPE)
1825 self.wait_for_enter()
1826 out, err = self.process.communicate()
1827 self.logger.debug("Finished running `{app}'".format(app=self.app_name))
1828 self.logger.info("Return code is `%s'" % self.process.returncode)
1829 self.logger.info(single_line_delim)
1830 self.logger.info("Executable `{app}' wrote to stdout:"
1831 .format(app=self.app_name))
1832 self.logger.info(single_line_delim)
1833 self.logger.info(out.decode('utf-8'))
1834 self.logger.info(single_line_delim)
1835 self.logger.info("Executable `{app}' wrote to stderr:"
1836 .format(app=self.app_name))
1837 self.logger.info(single_line_delim)
1838 self.logger.info(err.decode('utf-8'))
1839 self.logger.info(single_line_delim)
1840 self.result = self.process.returncode
1843 if __name__ == '__main__':