3 from __future__ import print_function
18 from collections import deque
19 from threading import Thread, Event
20 from inspect import getdoc, isclass
21 from traceback import format_exception
22 from logging import FileHandler, DEBUG, Formatter
24 from abc import ABC, abstractmethod
25 from struct import pack, unpack
28 from scapy.packet import Raw, Packet
29 from config import config, available_cpus, num_cpus, max_vpp_cpus
30 import hook as hookmodule
31 from vpp_pg_interface import VppPGInterface
32 from vpp_sub_interface import VppSubInterface
33 from vpp_lo_interface import VppLoInterface
34 from vpp_bvi_interface import VppBviInterface
35 from vpp_papi_provider import VppPapiProvider
36 from vpp_papi import VppEnum
38 from vpp_papi.vpp_stats import VPPStats
39 from vpp_papi.vpp_transport_socket import VppTransportSocketIOError
40 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
42 from vpp_object import VppObjectRegistry
43 from util import ppp, is_core_present
44 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
45 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
46 from scapy.layers.inet6 import ICMPv6EchoReply
49 logger = logging.getLogger(__name__)
51 # Set up an empty logger for the testcase that can be overridden as necessary
52 null_logger = logging.getLogger('VppTestCase')
53 null_logger.addHandler(logging.NullHandler())
63 if config.debug_framework:
67 Test framework module.
69 The module provides a set of tools for constructing and running tests and
70 representing the results.
74 class VppDiedError(Exception):
75 """ exception for reporting that the subprocess has died."""
77 signals_by_value = {v: k for k, v in signal.__dict__.items() if
78 k.startswith('SIG') and not k.startswith('SIG_')}
80 def __init__(self, rv=None, testcase=None, method_name=None):
82 self.signal_name = None
83 self.testcase = testcase
84 self.method_name = method_name
87 self.signal_name = VppDiedError.signals_by_value[-rv]
88 except (KeyError, TypeError):
91 if testcase is None and method_name is None:
94 in_msg = ' while running %s.%s' % (testcase, method_name)
97 msg = "VPP subprocess died unexpectedly%s with return code: %d%s."\
98 % (in_msg, self.rv, ' [%s]' %
100 self.signal_name is not None else ''))
102 msg = "VPP subprocess died unexpectedly%s." % in_msg
104 super(VppDiedError, self).__init__(msg)
107 class _PacketInfo(object):
108 """Private class to create packet info object.
110 Help process information about the next packet.
111 Set variables to default values.
113 #: Store the index of the packet.
115 #: Store the index of the source packet generator interface of the packet.
117 #: Store the index of the destination packet generator interface
120 #: Store expected ip version
122 #: Store expected upper protocol
124 #: Store the copy of the former packet.
127 def __eq__(self, other):
128 index = self.index == other.index
129 src = self.src == other.src
130 dst = self.dst == other.dst
131 data = self.data == other.data
132 return index and src and dst and data
135 def pump_output(testclass):
136 """ pump output from vpp stdout/stderr to proper queues """
139 while not testclass.pump_thread_stop_flag.is_set():
140 readable = select.select([testclass.vpp.stdout.fileno(),
141 testclass.vpp.stderr.fileno(),
142 testclass.pump_thread_wakeup_pipe[0]],
144 if testclass.vpp.stdout.fileno() in readable:
145 read = os.read(testclass.vpp.stdout.fileno(), 102400)
147 split = read.decode('ascii',
148 errors='backslashreplace').splitlines(True)
149 if len(stdout_fragment) > 0:
150 split[0] = "%s%s" % (stdout_fragment, split[0])
151 if len(split) > 0 and split[-1].endswith("\n"):
155 stdout_fragment = split[-1]
156 testclass.vpp_stdout_deque.extend(split[:limit])
157 if not config.cache_vpp_output:
158 for line in split[:limit]:
159 testclass.logger.info(
160 "VPP STDOUT: %s" % line.rstrip("\n"))
161 if testclass.vpp.stderr.fileno() in readable:
162 read = os.read(testclass.vpp.stderr.fileno(), 102400)
164 split = read.decode('ascii',
165 errors='backslashreplace').splitlines(True)
166 if len(stderr_fragment) > 0:
167 split[0] = "%s%s" % (stderr_fragment, split[0])
168 if len(split) > 0 and split[-1].endswith("\n"):
172 stderr_fragment = split[-1]
174 testclass.vpp_stderr_deque.extend(split[:limit])
175 if not config.cache_vpp_output:
176 for line in split[:limit]:
177 testclass.logger.error(
178 "VPP STDERR: %s" % line.rstrip("\n"))
179 # ignoring the dummy pipe here intentionally - the
180 # flag will take care of properly terminating the loop
183 def _is_platform_aarch64():
184 return platform.machine() == 'aarch64'
187 is_platform_aarch64 = _is_platform_aarch64()
190 class KeepAliveReporter(object):
192 Singleton object which reports test start to parent process
197 self.__dict__ = self._shared_state
205 def pipe(self, pipe):
206 if self._pipe is not None:
207 raise Exception("Internal error - pipe should only be set once.")
210 def send_keep_alive(self, test, desc=None):
212 Write current test tmpdir & desc to keep-alive pipe to signal liveness
214 if self.pipe is None:
215 # if not running forked..
219 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
223 self.pipe.send((desc, config.vpp, test.tempdir, test.vpp.pid))
226 class TestCaseTag(Enum):
227 # marks the suites that must run at the end
228 # using only a single test runner
230 # marks the suites broken on VPP multi-worker
231 FIXME_VPP_WORKERS = 2
232 # marks the suites broken when ASan is enabled
236 def create_tag_decorator(e):
239 cls.test_tags.append(e)
240 except AttributeError:
246 tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
247 tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
248 tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN)
262 class CPUInterface(ABC):
264 skipped_due_to_cpu_lack = False
268 def get_cpus_required(cls):
272 def assign_cpus(cls, cpus):
276 class VppTestCase(CPUInterface, unittest.TestCase):
277 """This subclass is a base class for VPP test cases that are implemented as
278 classes. It provides methods to create and run test case.
281 extra_vpp_statseg_config = ""
282 extra_vpp_punt_config = []
283 extra_vpp_plugin_config = []
285 vapi_response_timeout = 5
286 remove_configured_vpp_objects_on_tear_down = True
289 def packet_infos(self):
290 """List of packet infos"""
291 return self._packet_infos
294 def get_packet_count_for_if_idx(cls, dst_if_index):
295 """Get the number of packet info for specified destination if index"""
296 if dst_if_index in cls._packet_count_for_dst_if_idx:
297 return cls._packet_count_for_dst_if_idx[dst_if_index]
302 def has_tag(cls, tag):
303 """ if the test case has a given tag - return true """
305 return tag in cls.test_tags
306 except AttributeError:
311 def is_tagged_run_solo(cls):
312 """ if the test case class is timing-sensitive - return true """
313 return cls.has_tag(TestCaseTag.RUN_SOLO)
316 def skip_fixme_asan(cls):
317 """ if @tag_fixme_asan & ASan is enabled - mark for skip """
318 if cls.has_tag(TestCaseTag.FIXME_ASAN):
319 vpp_extra_cmake_args = os.environ.get('VPP_EXTRA_CMAKE_ARGS', '')
320 if 'DVPP_ENABLE_SANITIZE_ADDR=ON' in vpp_extra_cmake_args:
321 cls = unittest.skip("Skipping @tag_fixme_asan tests")(cls)
325 """Return the instance of this testcase"""
326 return cls.test_instance
329 def set_debug_flags(cls, d):
330 cls.gdbserver_port = 7777
331 cls.debug_core = False
332 cls.debug_gdb = False
333 cls.debug_gdbserver = False
334 cls.debug_all = False
335 cls.debug_attach = False
340 cls.debug_core = True
341 elif dl == "gdb" or dl == "gdb-all":
343 elif dl == "gdbserver" or dl == "gdbserver-all":
344 cls.debug_gdbserver = True
346 cls.debug_attach = True
348 raise Exception("Unrecognized DEBUG option: '%s'" % d)
349 if dl == "gdb-all" or dl == "gdbserver-all":
353 def get_vpp_worker_count(cls):
354 if not hasattr(cls, "vpp_worker_count"):
355 if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
356 cls.vpp_worker_count = 0
358 cls.vpp_worker_count = config.vpp_worker_count
359 return cls.vpp_worker_count
362 def get_cpus_required(cls):
363 return 1 + cls.get_vpp_worker_count()
366 def setUpConstants(cls):
367 """ Set-up the test case class based on environment variables """
368 cls.step = config.step
369 cls.plugin_path = ":".join(config.vpp_plugin_dir)
370 cls.test_plugin_path = ":".join(config.vpp_test_plugin_dir)
371 cls.extern_plugin_path = ":".join(config.extern_plugin_dir)
373 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
374 debug_cli = "cli-listen localhost:5002"
375 size = re.search(r"\d+[gG]", config.coredump_size)
377 coredump_size = f"coredump-size {config.coredump_size}".lower()
379 coredump_size = "coredump-size unlimited"
380 default_variant = config.variant
381 if default_variant is not None:
382 default_variant = "defaults { %s 100 }" % default_variant
386 api_fuzzing = config.api_fuzz
387 if api_fuzzing is None:
392 "unix", "{", "nodaemon", debug_cli, "full-coredump",
393 coredump_size, "runtime-dir", cls.tempdir, "}",
394 "api-trace", "{", "on", "}",
395 "api-segment", "{", "prefix", cls.get_api_segment_prefix(), "}",
396 "cpu", "{", "main-core", str(cls.cpus[0]), ]
397 if cls.extern_plugin_path not in (None, ""):
398 cls.extra_vpp_plugin_config.append(
399 "add-path %s" % cls.extern_plugin_path)
400 if cls.get_vpp_worker_count():
401 cls.vpp_cmdline.extend([
402 "corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])])
403 cls.vpp_cmdline.extend([
405 "physmem", "{", "max-size", "32m", "}",
406 "statseg", "{", "socket-name", cls.get_stats_sock_path(),
407 cls.extra_vpp_statseg_config, "}",
408 "socksvr", "{", "socket-name", cls.get_api_sock_path(), "}",
409 "node { ", default_variant, "}",
410 "api-fuzz {", api_fuzzing, "}",
411 "plugins", "{", "plugin", "dpdk_plugin.so", "{", "disable", "}",
412 "plugin", "rdma_plugin.so", "{", "disable", "}",
413 "plugin", "lisp_unittest_plugin.so", "{", "enable", "}",
414 "plugin", "unittest_plugin.so", "{", "enable", "}"
415 ] + cls.extra_vpp_plugin_config + ["}", ])
417 if cls.extra_vpp_punt_config is not None:
418 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
420 if not cls.debug_attach:
421 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
422 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
425 def wait_for_enter(cls):
426 if cls.debug_gdbserver:
427 print(double_line_delim)
428 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
430 print(double_line_delim)
431 print("Spawned VPP with PID: %d" % cls.vpp.pid)
433 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
435 print(single_line_delim)
436 print("You can debug VPP using:")
437 if cls.debug_gdbserver:
438 print(f"sudo gdb {config.vpp} "
439 f"-ex 'target remote localhost:{cls.gdbserver_port}'")
440 print("Now is the time to attach gdb by running the above "
441 "command, set up breakpoints etc., then resume VPP from "
442 "within gdb by issuing the 'continue' command")
443 cls.gdbserver_port += 1
445 print(f"sudo gdb {config.vpp} -ex 'attach {cls.vpp.pid}'")
446 print("Now is the time to attach gdb by running the above "
447 "command and set up breakpoints etc., then resume VPP from"
448 " within gdb by issuing the 'continue' command")
449 print(single_line_delim)
450 input("Press ENTER to continue running the testcase...")
458 cls.logger.debug(f"Assigned cpus: {cls.cpus}")
459 cmdline = cls.vpp_cmdline
461 if cls.debug_gdbserver:
462 gdbserver = '/usr/bin/gdbserver'
463 if not os.path.isfile(gdbserver) or\
464 not os.access(gdbserver, os.X_OK):
465 raise Exception("gdbserver binary '%s' does not exist or is "
466 "not executable" % gdbserver)
468 cmdline = [gdbserver, 'localhost:{port}'
469 .format(port=cls.gdbserver_port)] + cls.vpp_cmdline
470 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
473 cls.vpp = subprocess.Popen(cmdline,
474 stdout=subprocess.PIPE,
475 stderr=subprocess.PIPE)
476 except subprocess.CalledProcessError as e:
477 cls.logger.critical("Subprocess returned with non-0 return code: ("
481 cls.logger.critical("Subprocess returned with OS error: "
482 "(%s) %s", e.errno, e.strerror)
484 except Exception as e:
485 cls.logger.exception("Subprocess returned unexpected from "
492 def wait_for_coredump(cls):
493 corefile = cls.tempdir + "/core"
494 if os.path.isfile(corefile):
495 cls.logger.error("Waiting for coredump to complete: %s", corefile)
496 curr_size = os.path.getsize(corefile)
497 deadline = time.time() + 60
499 while time.time() < deadline:
502 curr_size = os.path.getsize(corefile)
503 if size == curr_size:
507 cls.logger.error("Timed out waiting for coredump to complete:"
510 cls.logger.error("Coredump complete: %s, size %d",
514 def get_stats_sock_path(cls):
515 return "%s/stats.sock" % cls.tempdir
518 def get_api_sock_path(cls):
519 return "%s/api.sock" % cls.tempdir
522 def get_api_segment_prefix(cls):
523 return os.path.basename(cls.tempdir) # Only used for VAPI
526 def get_tempdir(cls):
527 tmpdir = f"{config.tmp_dir}/vpp-unittest-{cls.__name__}"
528 if config.wipe_tmp_dir:
529 shutil.rmtree(tmpdir, ignore_errors=True)
534 def create_file_handler(cls):
535 if config.log_dir is None:
536 cls.file_handler = FileHandler(f"{cls.tempdir}/log.txt")
539 logdir = f"{config.log_dir}/vpp-unittest-{cls.__name__}"
540 if config.wipe_tmp_dir:
541 shutil.rmtree(logdir, ignore_errors=True)
543 cls.file_handler = FileHandler(f"{logdir}/log.txt")
548 Perform class setup before running the testcase
549 Remove shared memory files, start vpp and connect the vpp-api
551 super(VppTestCase, cls).setUpClass()
552 cls.logger = get_logger(cls.__name__)
553 random.seed(config.rnd_seed)
554 if hasattr(cls, 'parallel_handler'):
555 cls.logger.addHandler(cls.parallel_handler)
556 cls.logger.propagate = False
557 cls.set_debug_flags(config.debug)
558 cls.tempdir = cls.get_tempdir()
559 cls.create_file_handler()
560 cls.file_handler.setFormatter(
561 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
563 cls.file_handler.setLevel(DEBUG)
564 cls.logger.addHandler(cls.file_handler)
565 cls.logger.debug("--- setUpClass() for %s called ---" % cls.__name__)
566 os.chdir(cls.tempdir)
567 cls.logger.info("Temporary dir is %s, api socket is %s",
568 cls.tempdir, cls.get_api_sock_path())
569 cls.logger.debug("Random seed is %s", config.rnd_seed)
571 cls.reset_packet_infos()
576 cls.registry = VppObjectRegistry()
577 cls.vpp_startup_failed = False
578 cls.reporter = KeepAliveReporter()
579 # need to catch exceptions here because if we raise, then the cleanup
580 # doesn't get called and we might end with a zombie vpp
586 cls.reporter.send_keep_alive(cls, 'setUpClass')
587 VppTestResult.current_test_case_info = TestCaseInfo(
588 cls.logger, cls.tempdir, cls.vpp.pid, config.vpp)
589 cls.vpp_stdout_deque = deque()
590 cls.vpp_stderr_deque = deque()
591 if not cls.debug_attach:
592 cls.pump_thread_stop_flag = Event()
593 cls.pump_thread_wakeup_pipe = os.pipe()
594 cls.pump_thread = Thread(target=pump_output, args=(cls,))
595 cls.pump_thread.daemon = True
596 cls.pump_thread.start()
597 if cls.debug_gdb or cls.debug_gdbserver or cls.debug_attach:
598 cls.vapi_response_timeout = 0
599 cls.vapi = VppPapiProvider(cls.__name__, cls,
600 cls.vapi_response_timeout)
602 hook = hookmodule.StepHook(cls)
604 hook = hookmodule.PollHook(cls)
605 cls.vapi.register_hook(hook)
606 cls.statistics = VPPStats(socketname=cls.get_stats_sock_path())
610 cls.vpp_startup_failed = True
612 "VPP died shortly after startup, check the"
613 " output to standard error for possible cause")
617 except (vpp_papi.VPPIOError, Exception) as e:
618 cls.logger.debug("Exception connecting to vapi: %s" % e)
619 cls.vapi.disconnect()
621 if cls.debug_gdbserver:
622 print(colorize("You're running VPP inside gdbserver but "
623 "VPP-API connection failed, did you forget "
624 "to 'continue' VPP from within gdb?", RED))
627 last_line = cls.vapi.cli("show thread").split("\n")[-2]
628 cls.vpp_worker_count = int(last_line.split(" ")[0])
629 print("Detected VPP with %s workers." % cls.vpp_worker_count)
630 except vpp_papi.VPPRuntimeError as e:
631 cls.logger.debug("%s" % e)
634 except Exception as e:
635 cls.logger.debug("Exception connecting to VPP: %s" % e)
640 def _debug_quit(cls):
641 if (cls.debug_gdbserver or cls.debug_gdb):
645 if cls.vpp.returncode is None:
647 print(double_line_delim)
648 print("VPP or GDB server is still running")
649 print(single_line_delim)
650 input("When done debugging, press ENTER to kill the "
651 "process and finish running the testcase...")
652 except AttributeError:
658 Disconnect vpp-api, kill vpp and cleanup shared memory files
662 # first signal that we want to stop the pump thread, then wake it up
663 if hasattr(cls, 'pump_thread_stop_flag'):
664 cls.pump_thread_stop_flag.set()
665 if hasattr(cls, 'pump_thread_wakeup_pipe'):
666 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
667 if hasattr(cls, 'pump_thread'):
668 cls.logger.debug("Waiting for pump thread to stop")
669 cls.pump_thread.join()
670 if hasattr(cls, 'vpp_stderr_reader_thread'):
671 cls.logger.debug("Waiting for stderr pump to stop")
672 cls.vpp_stderr_reader_thread.join()
674 if hasattr(cls, 'vpp'):
675 if hasattr(cls, 'vapi'):
676 cls.logger.debug(cls.vapi.vpp.get_stats())
677 cls.logger.debug("Disconnecting class vapi client on %s",
679 cls.vapi.disconnect()
680 cls.logger.debug("Deleting class vapi attribute on %s",
684 if not cls.debug_attach and cls.vpp.returncode is None:
685 cls.wait_for_coredump()
686 cls.logger.debug("Sending TERM to vpp")
688 cls.logger.debug("Waiting for vpp to die")
690 outs, errs = cls.vpp.communicate(timeout=5)
691 except subprocess.TimeoutExpired:
693 outs, errs = cls.vpp.communicate()
694 cls.logger.debug("Deleting class vpp attribute on %s",
696 if not cls.debug_attach:
697 cls.vpp.stdout.close()
698 cls.vpp.stderr.close()
701 if cls.vpp_startup_failed:
702 stdout_log = cls.logger.info
703 stderr_log = cls.logger.critical
705 stdout_log = cls.logger.info
706 stderr_log = cls.logger.info
708 if hasattr(cls, 'vpp_stdout_deque'):
709 stdout_log(single_line_delim)
710 stdout_log('VPP output to stdout while running %s:', cls.__name__)
711 stdout_log(single_line_delim)
712 vpp_output = "".join(cls.vpp_stdout_deque)
713 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
715 stdout_log('\n%s', vpp_output)
716 stdout_log(single_line_delim)
718 if hasattr(cls, 'vpp_stderr_deque'):
719 stderr_log(single_line_delim)
720 stderr_log('VPP output to stderr while running %s:', cls.__name__)
721 stderr_log(single_line_delim)
722 vpp_output = "".join(cls.vpp_stderr_deque)
723 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
725 stderr_log('\n%s', vpp_output)
726 stderr_log(single_line_delim)
729 def tearDownClass(cls):
730 """ Perform final cleanup after running all tests in this test-case """
731 cls.logger.debug("--- tearDownClass() for %s called ---" %
733 cls.reporter.send_keep_alive(cls, 'tearDownClass')
735 cls.file_handler.close()
736 cls.reset_packet_infos()
737 if config.debug_framework:
738 debug_internal.on_tear_down_class(cls)
740 def show_commands_at_teardown(self):
741 """ Allow subclass specific teardown logging additions."""
742 self.logger.info("--- No test specific show commands provided. ---")
745 """ Show various debug prints after each test """
746 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
747 (self.__class__.__name__, self._testMethodName,
748 self._testMethodDoc))
751 if not self.vpp_dead:
752 self.logger.debug(self.vapi.cli("show trace max 1000"))
753 self.logger.info(self.vapi.ppcli("show interface"))
754 self.logger.info(self.vapi.ppcli("show hardware"))
755 self.logger.info(self.statistics.set_errors_str())
756 self.logger.info(self.vapi.ppcli("show run"))
757 self.logger.info(self.vapi.ppcli("show log"))
758 self.logger.info(self.vapi.ppcli("show bihash"))
759 self.logger.info("Logging testcase specific show commands.")
760 self.show_commands_at_teardown()
761 if self.remove_configured_vpp_objects_on_tear_down:
762 self.registry.remove_vpp_config(self.logger)
763 # Save/Dump VPP api trace log
764 m = self._testMethodName
765 api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
766 tmp_api_trace = "/tmp/%s" % api_trace
767 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
768 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
769 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
771 os.rename(tmp_api_trace, vpp_api_trace_log)
772 except VppTransportSocketIOError:
773 self.logger.debug("VppTransportSocketIOError: Vpp dead. "
774 "Cannot log show commands.")
777 self.registry.unregister_all(self.logger)
780 """ Clear trace before running each test"""
781 super(VppTestCase, self).setUp()
782 self.reporter.send_keep_alive(self)
784 raise VppDiedError(rv=None, testcase=self.__class__.__name__,
785 method_name=self._testMethodName)
786 self.sleep(.1, "during setUp")
787 self.vpp_stdout_deque.append(
788 "--- test setUp() for %s.%s(%s) starts here ---\n" %
789 (self.__class__.__name__, self._testMethodName,
790 self._testMethodDoc))
791 self.vpp_stderr_deque.append(
792 "--- test setUp() for %s.%s(%s) starts here ---\n" %
793 (self.__class__.__name__, self._testMethodName,
794 self._testMethodDoc))
795 self.vapi.cli("clear trace")
796 # store the test instance inside the test class - so that objects
797 # holding the class can access instance methods (like assertEqual)
798 type(self).test_instance = self
801 def pg_enable_capture(cls, interfaces=None):
803 Enable capture on packet-generator interfaces
805 :param interfaces: iterable interface indexes (if None,
806 use self.pg_interfaces)
809 if interfaces is None:
810 interfaces = cls.pg_interfaces
815 def register_pcap(cls, intf, worker):
816 """ Register a pcap in the testclass """
817 # add to the list of captures with current timestamp
818 cls._pcaps.append((intf, worker))
821 def get_vpp_time(cls):
822 # processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT"
823 # returns float("2.190522")
824 timestr = cls.vapi.cli('show clock')
825 head, sep, tail = timestr.partition(',')
826 head, sep, tail = head.partition('Time now')
830 def sleep_on_vpp_time(cls, sec):
831 """ Sleep according to time in VPP world """
832 # On a busy system with many processes
833 # we might end up with VPP time being slower than real world
834 # So take that into account when waiting for VPP to do something
835 start_time = cls.get_vpp_time()
836 while cls.get_vpp_time() - start_time < sec:
840 def pg_start(cls, trace=True):
841 """ Enable the PG, wait till it is done, then clean up """
842 for (intf, worker) in cls._old_pcaps:
843 intf.handle_old_pcap_file(intf.get_in_path(worker),
844 intf.in_history_counter)
847 cls.vapi.cli("clear trace")
848 cls.vapi.cli("trace add pg-input 1000")
849 cls.vapi.cli('packet-generator enable')
850 # PG, when starts, runs to completion -
851 # so let's avoid a race condition,
852 # and wait a little till it's done.
853 # Then clean it up - and then be gone.
854 deadline = time.time() + 300
855 while cls.vapi.cli('show packet-generator').find("Yes") != -1:
856 cls.sleep(0.01) # yield
857 if time.time() > deadline:
858 cls.logger.error("Timeout waiting for pg to stop")
860 for intf, worker in cls._pcaps:
861 cls.vapi.cli('packet-generator delete %s' %
862 intf.get_cap_name(worker))
863 cls._old_pcaps = cls._pcaps
867 def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0,
870 Create packet-generator interfaces.
872 :param interfaces: iterable indexes of the interfaces.
873 :returns: List of created interfaces.
878 intf = VppPGInterface(cls, i, gso, gso_size, mode)
879 setattr(cls, intf.name, intf)
881 cls.pg_interfaces = result
885 def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
886 pgmode = VppEnum.vl_api_pg_interface_mode_t
887 return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
888 pgmode.PG_API_MODE_IP4)
891 def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
892 pgmode = VppEnum.vl_api_pg_interface_mode_t
893 return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
894 pgmode.PG_API_MODE_IP6)
897 def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
898 pgmode = VppEnum.vl_api_pg_interface_mode_t
899 return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
900 pgmode.PG_API_MODE_ETHERNET)
903 def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
904 pgmode = VppEnum.vl_api_pg_interface_mode_t
905 return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
906 pgmode.PG_API_MODE_ETHERNET)
909 def create_loopback_interfaces(cls, count):
911 Create loopback interfaces.
913 :param count: number of interfaces created.
914 :returns: List of created interfaces.
916 result = [VppLoInterface(cls) for i in range(count)]
918 setattr(cls, intf.name, intf)
919 cls.lo_interfaces = result
923 def create_bvi_interfaces(cls, count):
925 Create BVI interfaces.
927 :param count: number of interfaces created.
928 :returns: List of created interfaces.
930 result = [VppBviInterface(cls) for i in range(count)]
932 setattr(cls, intf.name, intf)
933 cls.bvi_interfaces = result
937 def extend_packet(packet, size, padding=' '):
939 Extend packet to given size by padding with spaces or custom padding
940 NOTE: Currently works only when Raw layer is present.
942 :param packet: packet
943 :param size: target size
944 :param padding: padding used to extend the payload
947 packet_len = len(packet) + 4
948 extend = size - packet_len
950 num = (extend // len(padding)) + 1
951 packet[Raw].load += (padding * num)[:extend].encode("ascii")
954 def reset_packet_infos(cls):
955 """ Reset the list of packet info objects and packet counts to zero """
956 cls._packet_infos = {}
957 cls._packet_count_for_dst_if_idx = {}
960 def create_packet_info(cls, src_if, dst_if):
962 Create packet info object containing the source and destination indexes
963 and add it to the testcase's packet info list
965 :param VppInterface src_if: source interface
966 :param VppInterface dst_if: destination interface
968 :returns: _PacketInfo object
972 info.index = len(cls._packet_infos)
973 info.src = src_if.sw_if_index
974 info.dst = dst_if.sw_if_index
975 if isinstance(dst_if, VppSubInterface):
976 dst_idx = dst_if.parent.sw_if_index
978 dst_idx = dst_if.sw_if_index
979 if dst_idx in cls._packet_count_for_dst_if_idx:
980 cls._packet_count_for_dst_if_idx[dst_idx] += 1
982 cls._packet_count_for_dst_if_idx[dst_idx] = 1
983 cls._packet_infos[info.index] = info
987 def info_to_payload(info):
989 Convert _PacketInfo object to packet payload
991 :param info: _PacketInfo object
993 :returns: string containing serialized data from packet info
996 # retrieve payload, currently 18 bytes (4 x ints + 1 short)
997 return pack('iiiih', info.index, info.src,
998 info.dst, info.ip, info.proto)
1001 def payload_to_info(payload, payload_field='load'):
1003 Convert packet payload to _PacketInfo object
1005 :param payload: packet payload
1006 :type payload: <class 'scapy.packet.Raw'>
1007 :param payload_field: packet fieldname of payload "load" for
1008 <class 'scapy.packet.Raw'>
1009 :type payload_field: str
1010 :returns: _PacketInfo object containing de-serialized data from payload
1014 # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1015 payload_b = getattr(payload, payload_field)[:18]
1017 info = _PacketInfo()
1018 info.index, info.src, info.dst, info.ip, info.proto \
1019 = unpack('iiiih', payload_b)
1021 # some SRv6 TCs depend on get an exception if bad values are detected
1022 if info.index > 0x4000:
1023 raise ValueError('Index value is invalid')
1027 def get_next_packet_info(self, info):
1029 Iterate over the packet info list stored in the testcase
1030 Start iteration with first element if info is None
1031 Continue based on index in info if info is specified
1033 :param info: info or None
1034 :returns: next info in list or None if no more infos
1039 next_index = info.index + 1
1040 if next_index == len(self._packet_infos):
1043 return self._packet_infos[next_index]
1045 def get_next_packet_info_for_interface(self, src_index, info):
1047 Search the packet info list for the next packet info with same source
1050 :param src_index: source interface index to search for
1051 :param info: packet info - where to start the search
1052 :returns: packet info or None
1056 info = self.get_next_packet_info(info)
1059 if info.src == src_index:
1062 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
1064 Search the packet info list for the next packet info with same source
1065 and destination interface indexes
1067 :param src_index: source interface index to search for
1068 :param dst_index: destination interface index to search for
1069 :param info: packet info - where to start the search
1070 :returns: packet info or None
1074 info = self.get_next_packet_info_for_interface(src_index, info)
1077 if info.dst == dst_index:
1080 def assert_equal(self, real_value, expected_value, name_or_class=None):
1081 if name_or_class is None:
1082 self.assertEqual(real_value, expected_value)
1085 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
1086 msg = msg % (getdoc(name_or_class).strip(),
1087 real_value, str(name_or_class(real_value)),
1088 expected_value, str(name_or_class(expected_value)))
1090 msg = "Invalid %s: %s does not match expected value %s" % (
1091 name_or_class, real_value, expected_value)
1093 self.assertEqual(real_value, expected_value, msg)
1095 def assert_in_range(self,
1103 msg = "Invalid %s: %s out of range <%s,%s>" % (
1104 name, real_value, expected_min, expected_max)
1105 self.assertTrue(expected_min <= real_value <= expected_max, msg)
1107 def assert_packet_checksums_valid(self, packet,
1108 ignore_zero_udp_checksums=True):
1109 received = packet.__class__(scapy.compat.raw(packet))
1110 udp_layers = ['UDP', 'UDPerror']
1111 checksum_fields = ['cksum', 'chksum']
1114 temp = received.__class__(scapy.compat.raw(received))
1116 layer = temp.getlayer(counter)
1118 layer = layer.copy()
1119 layer.remove_payload()
1120 for cf in checksum_fields:
1121 if hasattr(layer, cf):
1122 if ignore_zero_udp_checksums and \
1123 0 == getattr(layer, cf) and \
1124 layer.name in udp_layers:
1126 delattr(temp.getlayer(counter), cf)
1127 checksums.append((counter, cf))
1130 counter = counter + 1
1131 if 0 == len(checksums):
1133 temp = temp.__class__(scapy.compat.raw(temp))
1134 for layer, cf in checksums:
1135 calc_sum = getattr(temp[layer], cf)
1137 getattr(received[layer], cf), calc_sum,
1138 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
1140 "Checksum field `%s` on `%s` layer has correct value `%s`" %
1141 (cf, temp[layer].name, calc_sum))
1143 def assert_checksum_valid(self, received_packet, layer,
1144 field_name='chksum',
1145 ignore_zero_checksum=False):
1146 """ Check checksum of received packet on given layer """
1147 received_packet_checksum = getattr(received_packet[layer], field_name)
1148 if ignore_zero_checksum and 0 == received_packet_checksum:
1150 recalculated = received_packet.__class__(
1151 scapy.compat.raw(received_packet))
1152 delattr(recalculated[layer], field_name)
1153 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1154 self.assert_equal(received_packet_checksum,
1155 getattr(recalculated[layer], field_name),
1156 "packet checksum on layer: %s" % layer)
1158 def assert_ip_checksum_valid(self, received_packet,
1159 ignore_zero_checksum=False):
1160 self.assert_checksum_valid(received_packet, 'IP',
1161 ignore_zero_checksum=ignore_zero_checksum)
1163 def assert_tcp_checksum_valid(self, received_packet,
1164 ignore_zero_checksum=False):
1165 self.assert_checksum_valid(received_packet, 'TCP',
1166 ignore_zero_checksum=ignore_zero_checksum)
1168 def assert_udp_checksum_valid(self, received_packet,
1169 ignore_zero_checksum=True):
1170 self.assert_checksum_valid(received_packet, 'UDP',
1171 ignore_zero_checksum=ignore_zero_checksum)
1173 def assert_embedded_icmp_checksum_valid(self, received_packet):
1174 if received_packet.haslayer(IPerror):
1175 self.assert_checksum_valid(received_packet, 'IPerror')
1176 if received_packet.haslayer(TCPerror):
1177 self.assert_checksum_valid(received_packet, 'TCPerror')
1178 if received_packet.haslayer(UDPerror):
1179 self.assert_checksum_valid(received_packet, 'UDPerror',
1180 ignore_zero_checksum=True)
1181 if received_packet.haslayer(ICMPerror):
1182 self.assert_checksum_valid(received_packet, 'ICMPerror')
1184 def assert_icmp_checksum_valid(self, received_packet):
1185 self.assert_checksum_valid(received_packet, 'ICMP')
1186 self.assert_embedded_icmp_checksum_valid(received_packet)
1188 def assert_icmpv6_checksum_valid(self, pkt):
1189 if pkt.haslayer(ICMPv6DestUnreach):
1190 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
1191 self.assert_embedded_icmp_checksum_valid(pkt)
1192 if pkt.haslayer(ICMPv6EchoRequest):
1193 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
1194 if pkt.haslayer(ICMPv6EchoReply):
1195 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
1197 def get_counter(self, counter):
1198 if counter.startswith("/"):
1199 counter_value = self.statistics.get_counter(counter)
1201 counters = self.vapi.cli("sh errors").split('\n')
1203 for i in range(1, len(counters) - 1):
1204 results = counters[i].split()
1205 if results[1] == counter:
1206 counter_value = int(results[0])
1208 return counter_value
1210 def assert_counter_equal(self, counter, expected_value,
1211 thread=None, index=0):
1212 c = self.get_counter(counter)
1213 if thread is not None:
1214 c = c[thread][index]
1216 c = sum(x[index] for x in c)
1217 self.assert_equal(c, expected_value, "counter `%s'" % counter)
1219 def assert_packet_counter_equal(self, counter, expected_value):
1220 counter_value = self.get_counter(counter)
1221 self.assert_equal(counter_value, expected_value,
1222 "packet counter `%s'" % counter)
1224 def assert_error_counter_equal(self, counter, expected_value):
1225 counter_value = self.statistics[counter].sum()
1226 self.assert_equal(counter_value, expected_value,
1227 "error counter `%s'" % counter)
1230 def sleep(cls, timeout, remark=None):
1232 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1233 # * by Guido, only the main thread can be interrupted.
1235 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1238 if hasattr(os, 'sched_yield'):
1244 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1245 before = time.time()
1248 if after - before > 2 * timeout:
1249 cls.logger.error("unexpected self.sleep() result - "
1250 "slept for %es instead of ~%es!",
1251 after - before, timeout)
1254 "Finished sleep (%s) - slept %es (wanted %es)",
1255 remark, after - before, timeout)
1257 def virtual_sleep(self, timeout, remark=None):
1258 self.logger.debug("Moving VPP time by %s (%s)", timeout, remark)
1259 self.vapi.cli("set clock adjust %s" % timeout)
1261 def pg_send(self, intf, pkts, worker=None, trace=True):
1262 intf.add_stream(pkts, worker=worker)
1263 self.pg_enable_capture(self.pg_interfaces)
1264 self.pg_start(trace=trace)
1266 def snapshot_stats(self, stats_diff):
1267 """Return snapshot of interesting stats based on diff dictionary."""
1269 for sw_if_index in stats_diff:
1270 for counter in stats_diff[sw_if_index]:
1271 stats_snapshot[counter] = self.statistics[counter]
1272 self.logger.debug(f"Took statistics stats_snapshot: {stats_snapshot}")
1273 return stats_snapshot
1275 def compare_stats_with_snapshot(self, stats_diff, stats_snapshot):
1276 """Assert appropriate difference between current stats and snapshot."""
1277 for sw_if_index in stats_diff:
1278 for cntr, diff in stats_diff[sw_if_index].items():
1279 if sw_if_index == "err":
1281 self.statistics[cntr].sum(),
1282 stats_snapshot[cntr].sum() + diff,
1283 f"'{cntr}' counter value (previous value: "
1284 f"{stats_snapshot[cntr].sum()}, "
1285 f"expected diff: {diff})")
1289 self.statistics[cntr][:, sw_if_index].sum(),
1290 stats_snapshot[cntr][:, sw_if_index].sum() + diff,
1291 f"'{cntr}' counter value (previous value: "
1292 f"{stats_snapshot[cntr][:, sw_if_index].sum()}, "
1293 f"expected diff: {diff})")
1295 # if diff is 0, then this most probably a case where
1296 # test declares multiple interfaces but traffic hasn't
1297 # passed through this one yet - which means the counter
1298 # value is 0 and can be ignored
1302 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None,
1303 stats_diff=None, trace=True, msg=None):
1305 stats_snapshot = self.snapshot_stats(stats_diff)
1307 self.pg_send(intf, pkts)
1312 for i in self.pg_interfaces:
1313 i.assert_nothing_captured(timeout=timeout, remark=remark)
1318 self.logger.debug(f"send_and_assert_no_replies: {msg}")
1319 self.logger.debug(self.vapi.cli("show trace"))
1322 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1324 def send_and_expect(self, intf, pkts, output, n_rx=None, worker=None,
1325 trace=True, msg=None, stats_diff=None):
1327 stats_snapshot = self.snapshot_stats(stats_diff)
1330 n_rx = 1 if isinstance(pkts, Packet) else len(pkts)
1331 self.pg_send(intf, pkts, worker=worker, trace=trace)
1332 rx = output.get_capture(n_rx)
1335 self.logger.debug(f"send_and_expect: {msg}")
1336 self.logger.debug(self.vapi.cli("show trace"))
1339 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1343 def send_and_expect_load_balancing(self, input, pkts, outputs,
1344 worker=None, trace=True):
1345 self.pg_send(input, pkts, worker=worker, trace=trace)
1348 rx = oo._get_capture(1)
1349 self.assertNotEqual(0, len(rx))
1352 self.logger.debug(self.vapi.cli("show trace"))
1355 def send_and_expect_some(self, intf, pkts, output,
1358 self.pg_send(intf, pkts, worker=worker, trace=trace)
1359 rx = output._get_capture(1)
1361 self.logger.debug(self.vapi.cli("show trace"))
1362 self.assertTrue(len(rx) > 0)
1363 self.assertTrue(len(rx) < len(pkts))
1366 def send_and_expect_only(self, intf, pkts, output, timeout=None,
1369 stats_snapshot = self.snapshot_stats(stats_diff)
1371 self.pg_send(intf, pkts)
1372 rx = output.get_capture(len(pkts))
1376 for i in self.pg_interfaces:
1377 if i not in outputs:
1378 i.assert_nothing_captured(timeout=timeout)
1382 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1387 def get_testcase_doc_name(test):
1388 return getdoc(test.__class__).splitlines()[0]
1391 def get_test_description(descriptions, test):
1392 short_description = test.shortDescription()
1393 if descriptions and short_description:
1394 return short_description
1399 class TestCaseInfo(object):
1400 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1401 self.logger = logger
1402 self.tempdir = tempdir
1403 self.vpp_pid = vpp_pid
1404 self.vpp_bin_path = vpp_bin_path
1405 self.core_crash_test = None
1408 class VppTestResult(unittest.TestResult):
1410 @property result_string
1411 String variable to store the test case result string.
1413 List variable containing 2-tuples of TestCase instances and strings
1414 holding formatted tracebacks. Each tuple represents a test which
1415 raised an unexpected exception.
1417 List variable containing 2-tuples of TestCase instances and strings
1418 holding formatted tracebacks. Each tuple represents a test where
1419 a failure was explicitly signalled using the TestCase.assert*()
1423 failed_test_cases_info = set()
1424 core_crash_test_cases_info = set()
1425 current_test_case_info = None
1427 def __init__(self, stream=None, descriptions=None, verbosity=None,
1430 :param stream File descriptor to store where to report test results.
1431 Set to the standard error stream by default.
1432 :param descriptions Boolean variable to store information if to use
1433 test case descriptions.
1434 :param verbosity Integer variable to store required verbosity level.
1436 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1437 self.stream = stream
1438 self.descriptions = descriptions
1439 self.verbosity = verbosity
1440 self.result_string = None
1441 self.runner = runner
1444 def addSuccess(self, test):
1446 Record a test succeeded result
1451 if self.current_test_case_info:
1452 self.current_test_case_info.logger.debug(
1453 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1454 test._testMethodName,
1455 test._testMethodDoc))
1456 unittest.TestResult.addSuccess(self, test)
1457 self.result_string = colorize("OK", GREEN)
1459 self.send_result_through_pipe(test, PASS)
1461 def addSkip(self, test, reason):
1463 Record a test skipped.
1469 if self.current_test_case_info:
1470 self.current_test_case_info.logger.debug(
1471 "--- addSkip() %s.%s(%s) called, reason is %s" %
1472 (test.__class__.__name__, test._testMethodName,
1473 test._testMethodDoc, reason))
1474 unittest.TestResult.addSkip(self, test, reason)
1475 self.result_string = colorize("SKIP", YELLOW)
1477 if reason == "not enough cpus":
1478 self.send_result_through_pipe(test, SKIP_CPU_SHORTAGE)
1480 self.send_result_through_pipe(test, SKIP)
1482 def symlink_failed(self):
1483 if self.current_test_case_info:
1485 failed_dir = config.failed_dir
1486 link_path = os.path.join(
1489 os.path.basename(self.current_test_case_info.tempdir))
1491 self.current_test_case_info.logger.debug(
1492 "creating a link to the failed test")
1493 self.current_test_case_info.logger.debug(
1494 "os.symlink(%s, %s)" %
1495 (self.current_test_case_info.tempdir, link_path))
1496 if os.path.exists(link_path):
1497 self.current_test_case_info.logger.debug(
1498 'symlink already exists')
1500 os.symlink(self.current_test_case_info.tempdir, link_path)
1502 except Exception as e:
1503 self.current_test_case_info.logger.error(e)
1505 def send_result_through_pipe(self, test, result):
1506 if hasattr(self, 'test_framework_result_pipe'):
1507 pipe = self.test_framework_result_pipe
1509 pipe.send((test.id(), result))
1511 def log_error(self, test, err, fn_name):
1512 if self.current_test_case_info:
1513 if isinstance(test, unittest.suite._ErrorHolder):
1514 test_name = test.description
1516 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1517 test._testMethodName,
1518 test._testMethodDoc)
1519 self.current_test_case_info.logger.debug(
1520 "--- %s() %s called, err is %s" %
1521 (fn_name, test_name, err))
1522 self.current_test_case_info.logger.debug(
1523 "formatted exception is:\n%s" %
1524 "".join(format_exception(*err)))
1526 def add_error(self, test, err, unittest_fn, error_type):
1527 if error_type == FAIL:
1528 self.log_error(test, err, 'addFailure')
1529 error_type_str = colorize("FAIL", RED)
1530 elif error_type == ERROR:
1531 self.log_error(test, err, 'addError')
1532 error_type_str = colorize("ERROR", RED)
1534 raise Exception('Error type %s cannot be used to record an '
1535 'error or a failure' % error_type)
1537 unittest_fn(self, test, err)
1538 if self.current_test_case_info:
1539 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1541 self.current_test_case_info.tempdir)
1542 self.symlink_failed()
1543 self.failed_test_cases_info.add(self.current_test_case_info)
1544 if is_core_present(self.current_test_case_info.tempdir):
1545 if not self.current_test_case_info.core_crash_test:
1546 if isinstance(test, unittest.suite._ErrorHolder):
1547 test_name = str(test)
1549 test_name = "'{!s}' ({!s})".format(
1550 get_testcase_doc_name(test), test.id())
1551 self.current_test_case_info.core_crash_test = test_name
1552 self.core_crash_test_cases_info.add(
1553 self.current_test_case_info)
1555 self.result_string = '%s [no temp dir]' % error_type_str
1557 self.send_result_through_pipe(test, error_type)
1559 def addFailure(self, test, err):
1561 Record a test failed result
1564 :param err: error message
1567 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1569 def addError(self, test, err):
1571 Record a test error result
1574 :param err: error message
1577 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1579 def getDescription(self, test):
1581 Get test description
1584 :returns: test description
1587 return get_test_description(self.descriptions, test)
1589 def startTest(self, test):
1597 def print_header(test):
1598 if test.__class__ in self.printed:
1601 test_doc = getdoc(test)
1603 raise Exception("No doc string for test '%s'" % test.id())
1605 test_title = test_doc.splitlines()[0].rstrip()
1606 test_title = colorize(test_title, GREEN)
1607 if test.is_tagged_run_solo():
1608 test_title = colorize(f"SOLO RUN: {test_title}", YELLOW)
1610 # This block may overwrite the colorized title above,
1611 # but we want this to stand out and be fixed
1612 if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
1613 test_title = colorize(
1614 f"FIXME with VPP workers: {test_title}", RED)
1616 if test.has_tag(TestCaseTag.FIXME_ASAN):
1617 test_title = colorize(
1618 f"FIXME with ASAN: {test_title}", RED)
1619 test.skip_fixme_asan()
1621 if hasattr(test, 'vpp_worker_count'):
1622 if test.vpp_worker_count == 0:
1623 test_title += " [main thread only]"
1624 elif test.vpp_worker_count == 1:
1625 test_title += " [1 worker thread]"
1627 test_title += f" [{test.vpp_worker_count} worker threads]"
1629 if test.__class__.skipped_due_to_cpu_lack:
1630 test_title = colorize(
1631 f"{test_title} [skipped - not enough cpus, "
1632 f"required={test.__class__.get_cpus_required()}, "
1633 f"available={max_vpp_cpus}]", YELLOW)
1635 print(double_line_delim)
1637 print(double_line_delim)
1638 self.printed.append(test.__class__)
1641 self.start_test = time.time()
1642 unittest.TestResult.startTest(self, test)
1643 if self.verbosity > 0:
1644 self.stream.writeln(
1645 "Starting " + self.getDescription(test) + " ...")
1646 self.stream.writeln(single_line_delim)
1648 def stopTest(self, test):
1650 Called when the given test has been run
1655 unittest.TestResult.stopTest(self, test)
1657 if self.verbosity > 0:
1658 self.stream.writeln(single_line_delim)
1659 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1660 self.result_string))
1661 self.stream.writeln(single_line_delim)
1663 self.stream.writeln("%-68s %4.2f %s" %
1664 (self.getDescription(test),
1665 time.time() - self.start_test,
1666 self.result_string))
1668 self.send_result_through_pipe(test, TEST_RUN)
1670 def printErrors(self):
1672 Print errors from running the test case
1674 if len(self.errors) > 0 or len(self.failures) > 0:
1675 self.stream.writeln()
1676 self.printErrorList('ERROR', self.errors)
1677 self.printErrorList('FAIL', self.failures)
1679 # ^^ that is the last output from unittest before summary
1680 if not self.runner.print_summary:
1681 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1682 self.stream = devnull
1683 self.runner.stream = devnull
1685 def printErrorList(self, flavour, errors):
1687 Print error list to the output stream together with error type
1688 and test case description.
1690 :param flavour: error type
1691 :param errors: iterable errors
1694 for test, err in errors:
1695 self.stream.writeln(double_line_delim)
1696 self.stream.writeln("%s: %s" %
1697 (flavour, self.getDescription(test)))
1698 self.stream.writeln(single_line_delim)
1699 self.stream.writeln("%s" % err)
1702 class VppTestRunner(unittest.TextTestRunner):
1704 A basic test runner implementation which prints results to standard error.
1708 def resultclass(self):
1709 """Class maintaining the results of the tests"""
1710 return VppTestResult
1712 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1713 result_pipe=None, failfast=False, buffer=False,
1714 resultclass=None, print_summary=True, **kwargs):
1715 # ignore stream setting here, use hard-coded stdout to be in sync
1716 # with prints from VppTestCase methods ...
1717 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1718 verbosity, failfast, buffer,
1719 resultclass, **kwargs)
1720 KeepAliveReporter.pipe = keep_alive_pipe
1722 self.orig_stream = self.stream
1723 self.resultclass.test_framework_result_pipe = result_pipe
1725 self.print_summary = print_summary
1727 def _makeResult(self):
1728 return self.resultclass(self.stream,
1733 def run(self, test):
1740 faulthandler.enable() # emit stack trace to stderr if killed by signal
1742 result = super(VppTestRunner, self).run(test)
1743 if not self.print_summary:
1744 self.stream = self.orig_stream
1745 result.stream = self.orig_stream
1749 class Worker(Thread):
1750 def __init__(self, executable_args, logger, env=None, *args, **kwargs):
1751 super(Worker, self).__init__(*args, **kwargs)
1752 self.logger = logger
1753 self.args = executable_args
1754 if hasattr(self, 'testcase') and self.testcase.debug_all:
1755 if self.testcase.debug_gdbserver:
1756 self.args = ['/usr/bin/gdbserver', 'localhost:{port}'
1757 .format(port=self.testcase.gdbserver_port)] + args
1758 elif self.testcase.debug_gdb and hasattr(self, 'wait_for_gdb'):
1759 self.args.append(self.wait_for_gdb)
1760 self.app_bin = executable_args[0]
1761 self.app_name = os.path.basename(self.app_bin)
1762 if hasattr(self, 'role'):
1763 self.app_name += ' {role}'.format(role=self.role)
1766 env = {} if env is None else env
1767 self.env = copy.deepcopy(env)
1769 def wait_for_enter(self):
1770 if not hasattr(self, 'testcase'):
1772 if self.testcase.debug_all and self.testcase.debug_gdbserver:
1774 print(double_line_delim)
1775 print("Spawned GDB Server for '{app}' with PID: {pid}"
1776 .format(app=self.app_name, pid=self.process.pid))
1777 elif self.testcase.debug_all and self.testcase.debug_gdb:
1779 print(double_line_delim)
1780 print("Spawned '{app}' with PID: {pid}"
1781 .format(app=self.app_name, pid=self.process.pid))
1784 print(single_line_delim)
1785 print("You can debug '{app}' using:".format(app=self.app_name))
1786 if self.testcase.debug_gdbserver:
1787 print("sudo gdb " + self.app_bin +
1788 " -ex 'target remote localhost:{port}'"
1789 .format(port=self.testcase.gdbserver_port))
1790 print("Now is the time to attach gdb by running the above "
1791 "command, set up breakpoints etc., then resume from "
1792 "within gdb by issuing the 'continue' command")
1793 self.testcase.gdbserver_port += 1
1794 elif self.testcase.debug_gdb:
1795 print("sudo gdb " + self.app_bin +
1796 " -ex 'attach {pid}'".format(pid=self.process.pid))
1797 print("Now is the time to attach gdb by running the above "
1798 "command and set up breakpoints etc., then resume from"
1799 " within gdb by issuing the 'continue' command")
1800 print(single_line_delim)
1801 input("Press ENTER to continue running the testcase...")
1804 executable = self.args[0]
1805 if not os.path.exists(executable) or not os.access(
1806 executable, os.F_OK | os.X_OK):
1807 # Exit code that means some system file did not exist,
1808 # could not be opened, or had some other kind of error.
1809 self.result = os.EX_OSFILE
1810 raise EnvironmentError(
1811 "executable '%s' is not found or executable." % executable)
1812 self.logger.debug("Running executable '{app}': '{cmd}'"
1813 .format(app=self.app_name,
1814 cmd=' '.join(self.args)))
1815 env = os.environ.copy()
1816 env.update(self.env)
1817 env["CK_LOG_FILE_NAME"] = "-"
1818 self.process = subprocess.Popen(
1819 ['stdbuf', '-o0', '-e0'] + self.args, shell=False, env=env,
1820 preexec_fn=os.setpgrp, stdout=subprocess.PIPE,
1821 stderr=subprocess.PIPE)
1822 self.wait_for_enter()
1823 out, err = self.process.communicate()
1824 self.logger.debug("Finished running `{app}'".format(app=self.app_name))
1825 self.logger.info("Return code is `%s'" % self.process.returncode)
1826 self.logger.info(single_line_delim)
1827 self.logger.info("Executable `{app}' wrote to stdout:"
1828 .format(app=self.app_name))
1829 self.logger.info(single_line_delim)
1830 self.logger.info(out.decode('utf-8'))
1831 self.logger.info(single_line_delim)
1832 self.logger.info("Executable `{app}' wrote to stderr:"
1833 .format(app=self.app_name))
1834 self.logger.info(single_line_delim)
1835 self.logger.info(err.decode('utf-8'))
1836 self.logger.info(single_line_delim)
1837 self.result = self.process.returncode
1840 if __name__ == '__main__':