3 from __future__ import print_function
18 from collections import deque
19 from threading import Thread, Event
20 from inspect import getdoc, isclass
21 from traceback import format_exception
22 from logging import FileHandler, DEBUG, Formatter
24 from abc import ABC, abstractmethod
25 from struct import pack, unpack
28 from scapy.packet import Raw, Packet
29 from config import config, available_cpus, num_cpus, max_vpp_cpus
30 import hook as hookmodule
31 from vpp_pg_interface import VppPGInterface
32 from vpp_sub_interface import VppSubInterface
33 from vpp_lo_interface import VppLoInterface
34 from vpp_bvi_interface import VppBviInterface
35 from vpp_papi_provider import VppPapiProvider
36 from vpp_papi import VppEnum
38 from vpp_papi.vpp_stats import VPPStats
39 from vpp_papi.vpp_transport_socket import VppTransportSocketIOError
40 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
42 from vpp_object import VppObjectRegistry
43 from util import ppp, is_core_present
44 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
45 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
46 from scapy.layers.inet6 import ICMPv6EchoReply
49 logger = logging.getLogger(__name__)
51 # Set up an empty logger for the testcase that can be overridden as necessary
52 null_logger = logging.getLogger('VppTestCase')
53 null_logger.addHandler(logging.NullHandler())
63 if config.debug_framework:
67 Test framework module.
69 The module provides a set of tools for constructing and running tests and
70 representing the results.
74 class VppDiedError(Exception):
75 """ exception for reporting that the subprocess has died."""
77 signals_by_value = {v: k for k, v in signal.__dict__.items() if
78 k.startswith('SIG') and not k.startswith('SIG_')}
80 def __init__(self, rv=None, testcase=None, method_name=None):
82 self.signal_name = None
83 self.testcase = testcase
84 self.method_name = method_name
87 self.signal_name = VppDiedError.signals_by_value[-rv]
88 except (KeyError, TypeError):
91 if testcase is None and method_name is None:
94 in_msg = ' while running %s.%s' % (testcase, method_name)
97 msg = "VPP subprocess died unexpectedly%s with return code: %d%s."\
98 % (in_msg, self.rv, ' [%s]' %
100 self.signal_name is not None else ''))
102 msg = "VPP subprocess died unexpectedly%s." % in_msg
104 super(VppDiedError, self).__init__(msg)
107 class _PacketInfo(object):
108 """Private class to create packet info object.
110 Help process information about the next packet.
111 Set variables to default values.
113 #: Store the index of the packet.
115 #: Store the index of the source packet generator interface of the packet.
117 #: Store the index of the destination packet generator interface
120 #: Store expected ip version
122 #: Store expected upper protocol
124 #: Store the copy of the former packet.
127 def __eq__(self, other):
128 index = self.index == other.index
129 src = self.src == other.src
130 dst = self.dst == other.dst
131 data = self.data == other.data
132 return index and src and dst and data
135 def pump_output(testclass):
136 """ pump output from vpp stdout/stderr to proper queues """
139 while not testclass.pump_thread_stop_flag.is_set():
140 readable = select.select([testclass.vpp.stdout.fileno(),
141 testclass.vpp.stderr.fileno(),
142 testclass.pump_thread_wakeup_pipe[0]],
144 if testclass.vpp.stdout.fileno() in readable:
145 read = os.read(testclass.vpp.stdout.fileno(), 102400)
147 split = read.decode('ascii',
148 errors='backslashreplace').splitlines(True)
149 if len(stdout_fragment) > 0:
150 split[0] = "%s%s" % (stdout_fragment, split[0])
151 if len(split) > 0 and split[-1].endswith("\n"):
155 stdout_fragment = split[-1]
156 testclass.vpp_stdout_deque.extend(split[:limit])
157 if not config.cache_vpp_output:
158 for line in split[:limit]:
159 testclass.logger.info(
160 "VPP STDOUT: %s" % line.rstrip("\n"))
161 if testclass.vpp.stderr.fileno() in readable:
162 read = os.read(testclass.vpp.stderr.fileno(), 102400)
164 split = read.decode('ascii',
165 errors='backslashreplace').splitlines(True)
166 if len(stderr_fragment) > 0:
167 split[0] = "%s%s" % (stderr_fragment, split[0])
168 if len(split) > 0 and split[-1].endswith("\n"):
172 stderr_fragment = split[-1]
174 testclass.vpp_stderr_deque.extend(split[:limit])
175 if not config.cache_vpp_output:
176 for line in split[:limit]:
177 testclass.logger.error(
178 "VPP STDERR: %s" % line.rstrip("\n"))
179 # ignoring the dummy pipe here intentionally - the
180 # flag will take care of properly terminating the loop
183 def _is_platform_aarch64():
184 return platform.machine() == 'aarch64'
187 is_platform_aarch64 = _is_platform_aarch64()
190 class KeepAliveReporter(object):
192 Singleton object which reports test start to parent process
197 self.__dict__ = self._shared_state
205 def pipe(self, pipe):
206 if self._pipe is not None:
207 raise Exception("Internal error - pipe should only be set once.")
210 def send_keep_alive(self, test, desc=None):
212 Write current test tmpdir & desc to keep-alive pipe to signal liveness
214 if self.pipe is None:
215 # if not running forked..
219 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
223 self.pipe.send((desc, config.vpp, test.tempdir, test.vpp.pid))
226 class TestCaseTag(Enum):
227 # marks the suites that must run at the end
228 # using only a single test runner
230 # marks the suites broken on VPP multi-worker
231 FIXME_VPP_WORKERS = 2
232 # marks the suites broken when ASan is enabled
236 def create_tag_decorator(e):
239 cls.test_tags.append(e)
240 except AttributeError:
246 tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
247 tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
248 tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN)
262 class CPUInterface(ABC):
264 skipped_due_to_cpu_lack = False
268 def get_cpus_required(cls):
272 def assign_cpus(cls, cpus):
276 class VppTestCase(CPUInterface, unittest.TestCase):
277 """This subclass is a base class for VPP test cases that are implemented as
278 classes. It provides methods to create and run test case.
281 extra_vpp_statseg_config = ""
282 extra_vpp_punt_config = []
283 extra_vpp_plugin_config = []
285 vapi_response_timeout = 5
286 remove_configured_vpp_objects_on_tear_down = True
289 def packet_infos(self):
290 """List of packet infos"""
291 return self._packet_infos
294 def get_packet_count_for_if_idx(cls, dst_if_index):
295 """Get the number of packet info for specified destination if index"""
296 if dst_if_index in cls._packet_count_for_dst_if_idx:
297 return cls._packet_count_for_dst_if_idx[dst_if_index]
302 def has_tag(cls, tag):
303 """ if the test case has a given tag - return true """
305 return tag in cls.test_tags
306 except AttributeError:
311 def is_tagged_run_solo(cls):
312 """ if the test case class is timing-sensitive - return true """
313 return cls.has_tag(TestCaseTag.RUN_SOLO)
316 def skip_fixme_asan(cls):
317 """ if @tag_fixme_asan & ASan is enabled - mark for skip """
318 if cls.has_tag(TestCaseTag.FIXME_ASAN):
319 vpp_extra_cmake_args = os.environ.get('VPP_EXTRA_CMAKE_ARGS', '')
320 if 'DVPP_ENABLE_SANITIZE_ADDR=ON' in vpp_extra_cmake_args:
321 cls = unittest.skip("Skipping @tag_fixme_asan tests")(cls)
325 """Return the instance of this testcase"""
326 return cls.test_instance
329 def set_debug_flags(cls, d):
330 cls.gdbserver_port = 7777
331 cls.debug_core = False
332 cls.debug_gdb = False
333 cls.debug_gdbserver = False
334 cls.debug_all = False
335 cls.debug_attach = False
340 cls.debug_core = True
341 elif dl == "gdb" or dl == "gdb-all":
343 elif dl == "gdbserver" or dl == "gdbserver-all":
344 cls.debug_gdbserver = True
346 cls.debug_attach = True
348 raise Exception("Unrecognized DEBUG option: '%s'" % d)
349 if dl == "gdb-all" or dl == "gdbserver-all":
353 def get_vpp_worker_count(cls):
354 if not hasattr(cls, "vpp_worker_count"):
355 if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
356 cls.vpp_worker_count = 0
358 cls.vpp_worker_count = config.vpp_worker_count
359 return cls.vpp_worker_count
362 def get_cpus_required(cls):
363 return 1 + cls.get_vpp_worker_count()
366 def setUpConstants(cls):
367 """ Set-up the test case class based on environment variables """
368 cls.step = config.step
369 cls.plugin_path = ":".join(config.vpp_plugin_dir)
370 cls.test_plugin_path = ":".join(config.vpp_test_plugin_dir)
371 cls.extern_plugin_path = ":".join(config.extern_plugin_dir)
373 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
374 debug_cli = "cli-listen localhost:5002"
375 size = re.search(r"\d+[gG]", config.coredump_size)
377 coredump_size = f"coredump-size {config.coredump_size}".lower()
379 coredump_size = "coredump-size unlimited"
380 default_variant = config.variant
381 if default_variant is not None:
382 default_variant = "defaults { %s 100 }" % default_variant
386 api_fuzzing = config.api_fuzz
387 if api_fuzzing is None:
392 "unix", "{", "nodaemon", debug_cli, "full-coredump",
393 coredump_size, "runtime-dir", cls.tempdir, "}",
394 "api-trace", "{", "on", "}",
395 "api-segment", "{", "prefix", cls.get_api_segment_prefix(), "}",
396 "cpu", "{", "main-core", str(cls.cpus[0]), ]
397 if cls.extern_plugin_path not in (None, ""):
398 cls.extra_vpp_plugin_config.append(
399 "add-path %s" % cls.extern_plugin_path)
400 if cls.get_vpp_worker_count():
401 cls.vpp_cmdline.extend([
402 "corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])])
403 cls.vpp_cmdline.extend([
405 "physmem", "{", "max-size", "32m", "}",
406 "statseg", "{", "socket-name", cls.get_stats_sock_path(),
407 cls.extra_vpp_statseg_config, "}",
408 "socksvr", "{", "socket-name", cls.get_api_sock_path(), "}",
409 "node { ", default_variant, "}",
410 "api-fuzz {", api_fuzzing, "}",
411 "plugins", "{", "plugin", "dpdk_plugin.so", "{", "disable", "}",
412 "plugin", "rdma_plugin.so", "{", "disable", "}",
413 "plugin", "lisp_unittest_plugin.so", "{", "enable", "}",
414 "plugin", "unittest_plugin.so", "{", "enable", "}"
415 ] + cls.extra_vpp_plugin_config + ["}", ])
417 if cls.extra_vpp_punt_config is not None:
418 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
420 if not cls.debug_attach:
421 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
422 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
425 def wait_for_enter(cls):
426 if cls.debug_gdbserver:
427 print(double_line_delim)
428 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
430 print(double_line_delim)
431 print("Spawned VPP with PID: %d" % cls.vpp.pid)
433 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
435 print(single_line_delim)
436 print("You can debug VPP using:")
437 if cls.debug_gdbserver:
438 print(f"sudo gdb {config.vpp} "
439 f"-ex 'target remote localhost:{cls.gdbserver_port}'")
440 print("Now is the time to attach gdb by running the above "
441 "command, set up breakpoints etc., then resume VPP from "
442 "within gdb by issuing the 'continue' command")
443 cls.gdbserver_port += 1
445 print(f"sudo gdb {config.vpp} -ex 'attach {cls.vpp.pid}'")
446 print("Now is the time to attach gdb by running the above "
447 "command and set up breakpoints etc., then resume VPP from"
448 " within gdb by issuing the 'continue' command")
449 print(single_line_delim)
450 input("Press ENTER to continue running the testcase...")
458 cls.logger.debug(f"Assigned cpus: {cls.cpus}")
459 cmdline = cls.vpp_cmdline
461 if cls.debug_gdbserver:
462 gdbserver = '/usr/bin/gdbserver'
463 if not os.path.isfile(gdbserver) or\
464 not os.access(gdbserver, os.X_OK):
465 raise Exception("gdbserver binary '%s' does not exist or is "
466 "not executable" % gdbserver)
468 cmdline = [gdbserver, 'localhost:{port}'
469 .format(port=cls.gdbserver_port)] + cls.vpp_cmdline
470 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
473 cls.vpp = subprocess.Popen(cmdline,
474 stdout=subprocess.PIPE,
475 stderr=subprocess.PIPE)
476 except subprocess.CalledProcessError as e:
477 cls.logger.critical("Subprocess returned with non-0 return code: ("
481 cls.logger.critical("Subprocess returned with OS error: "
482 "(%s) %s", e.errno, e.strerror)
484 except Exception as e:
485 cls.logger.exception("Subprocess returned unexpected from "
492 def wait_for_coredump(cls):
493 corefile = cls.tempdir + "/core"
494 if os.path.isfile(corefile):
495 cls.logger.error("Waiting for coredump to complete: %s", corefile)
496 curr_size = os.path.getsize(corefile)
497 deadline = time.time() + 60
499 while time.time() < deadline:
502 curr_size = os.path.getsize(corefile)
503 if size == curr_size:
507 cls.logger.error("Timed out waiting for coredump to complete:"
510 cls.logger.error("Coredump complete: %s, size %d",
514 def get_stats_sock_path(cls):
515 return "%s/stats.sock" % cls.tempdir
518 def get_api_sock_path(cls):
519 return "%s/api.sock" % cls.tempdir
522 def get_api_segment_prefix(cls):
523 return os.path.basename(cls.tempdir) # Only used for VAPI
526 def get_tempdir(cls):
527 tmpdir = f"{config.tmp_dir}/vpp-unittest-{cls.__name__}"
528 if config.wipe_tmp_dir:
529 shutil.rmtree(tmpdir, ignore_errors=True)
534 def create_file_handler(cls):
535 if config.log_dir is None:
536 cls.file_handler = FileHandler(f"{cls.tempdir}/log.txt")
539 logdir = f"{config.log_dir}/vpp-unittest-{cls.__name__}"
540 if config.wipe_tmp_dir:
541 shutil.rmtree(logdir, ignore_errors=True)
543 cls.file_handler = FileHandler(f"{logdir}/log.txt")
548 Perform class setup before running the testcase
549 Remove shared memory files, start vpp and connect the vpp-api
551 super(VppTestCase, cls).setUpClass()
552 cls.logger = get_logger(cls.__name__)
553 random.seed(config.rnd_seed)
554 if hasattr(cls, 'parallel_handler'):
555 cls.logger.addHandler(cls.parallel_handler)
556 cls.logger.propagate = False
557 cls.set_debug_flags(config.debug)
558 cls.tempdir = cls.get_tempdir()
559 cls.create_file_handler()
560 cls.file_handler.setFormatter(
561 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
563 cls.file_handler.setLevel(DEBUG)
564 cls.logger.addHandler(cls.file_handler)
565 cls.logger.debug("--- setUpClass() for %s called ---" % cls.__name__)
566 os.chdir(cls.tempdir)
567 cls.logger.info("Temporary dir is %s, api socket is %s",
568 cls.tempdir, cls.get_api_sock_path())
569 cls.logger.debug("Random seed is %s", config.rnd_seed)
571 cls.reset_packet_infos()
576 cls.registry = VppObjectRegistry()
577 cls.vpp_startup_failed = False
578 cls.reporter = KeepAliveReporter()
579 # need to catch exceptions here because if we raise, then the cleanup
580 # doesn't get called and we might end with a zombie vpp
586 cls.reporter.send_keep_alive(cls, 'setUpClass')
587 VppTestResult.current_test_case_info = TestCaseInfo(
588 cls.logger, cls.tempdir, cls.vpp.pid, config.vpp)
589 cls.vpp_stdout_deque = deque()
590 cls.vpp_stderr_deque = deque()
591 if not cls.debug_attach:
592 cls.pump_thread_stop_flag = Event()
593 cls.pump_thread_wakeup_pipe = os.pipe()
594 cls.pump_thread = Thread(target=pump_output, args=(cls,))
595 cls.pump_thread.daemon = True
596 cls.pump_thread.start()
597 if cls.debug_gdb or cls.debug_gdbserver or cls.debug_attach:
598 cls.vapi_response_timeout = 0
599 cls.vapi = VppPapiProvider(cls.__name__, cls,
600 cls.vapi_response_timeout)
602 hook = hookmodule.StepHook(cls)
604 hook = hookmodule.PollHook(cls)
605 cls.vapi.register_hook(hook)
606 cls.statistics = VPPStats(socketname=cls.get_stats_sock_path())
610 cls.vpp_startup_failed = True
612 "VPP died shortly after startup, check the"
613 " output to standard error for possible cause")
617 except (vpp_papi.VPPIOError, Exception) as e:
618 cls.logger.debug("Exception connecting to vapi: %s" % e)
619 cls.vapi.disconnect()
621 if cls.debug_gdbserver:
622 print(colorize("You're running VPP inside gdbserver but "
623 "VPP-API connection failed, did you forget "
624 "to 'continue' VPP from within gdb?", RED))
627 last_line = cls.vapi.cli("show thread").split("\n")[-2]
628 cls.vpp_worker_count = int(last_line.split(" ")[0])
629 print("Detected VPP with %s workers." % cls.vpp_worker_count)
630 except vpp_papi.VPPRuntimeError as e:
631 cls.logger.debug("%s" % e)
634 except Exception as e:
635 cls.logger.debug("Exception connecting to VPP: %s" % e)
640 def _debug_quit(cls):
641 if (cls.debug_gdbserver or cls.debug_gdb):
645 if cls.vpp.returncode is None:
647 print(double_line_delim)
648 print("VPP or GDB server is still running")
649 print(single_line_delim)
650 input("When done debugging, press ENTER to kill the "
651 "process and finish running the testcase...")
652 except AttributeError:
658 Disconnect vpp-api, kill vpp and cleanup shared memory files
662 # first signal that we want to stop the pump thread, then wake it up
663 if hasattr(cls, 'pump_thread_stop_flag'):
664 cls.pump_thread_stop_flag.set()
665 if hasattr(cls, 'pump_thread_wakeup_pipe'):
666 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
667 if hasattr(cls, 'pump_thread'):
668 cls.logger.debug("Waiting for pump thread to stop")
669 cls.pump_thread.join()
670 if hasattr(cls, 'vpp_stderr_reader_thread'):
671 cls.logger.debug("Waiting for stderr pump to stop")
672 cls.vpp_stderr_reader_thread.join()
674 if hasattr(cls, 'vpp'):
675 if hasattr(cls, 'vapi'):
676 cls.logger.debug(cls.vapi.vpp.get_stats())
677 cls.logger.debug("Disconnecting class vapi client on %s",
679 cls.vapi.disconnect()
680 cls.logger.debug("Deleting class vapi attribute on %s",
684 if not cls.debug_attach and cls.vpp.returncode is None:
685 cls.wait_for_coredump()
686 cls.logger.debug("Sending TERM to vpp")
688 cls.logger.debug("Waiting for vpp to die")
690 outs, errs = cls.vpp.communicate(timeout=5)
691 except subprocess.TimeoutExpired:
693 outs, errs = cls.vpp.communicate()
694 cls.logger.debug("Deleting class vpp attribute on %s",
696 if not cls.debug_attach:
697 cls.vpp.stdout.close()
698 cls.vpp.stderr.close()
701 if cls.vpp_startup_failed:
702 stdout_log = cls.logger.info
703 stderr_log = cls.logger.critical
705 stdout_log = cls.logger.info
706 stderr_log = cls.logger.info
708 if hasattr(cls, 'vpp_stdout_deque'):
709 stdout_log(single_line_delim)
710 stdout_log('VPP output to stdout while running %s:', cls.__name__)
711 stdout_log(single_line_delim)
712 vpp_output = "".join(cls.vpp_stdout_deque)
713 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
715 stdout_log('\n%s', vpp_output)
716 stdout_log(single_line_delim)
718 if hasattr(cls, 'vpp_stderr_deque'):
719 stderr_log(single_line_delim)
720 stderr_log('VPP output to stderr while running %s:', cls.__name__)
721 stderr_log(single_line_delim)
722 vpp_output = "".join(cls.vpp_stderr_deque)
723 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
725 stderr_log('\n%s', vpp_output)
726 stderr_log(single_line_delim)
729 def tearDownClass(cls):
730 """ Perform final cleanup after running all tests in this test-case """
731 cls.logger.debug("--- tearDownClass() for %s called ---" %
733 cls.reporter.send_keep_alive(cls, 'tearDownClass')
735 cls.file_handler.close()
736 cls.reset_packet_infos()
737 if config.debug_framework:
738 debug_internal.on_tear_down_class(cls)
740 def show_commands_at_teardown(self):
741 """ Allow subclass specific teardown logging additions."""
742 self.logger.info("--- No test specific show commands provided. ---")
745 """ Show various debug prints after each test """
746 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
747 (self.__class__.__name__, self._testMethodName,
748 self._testMethodDoc))
751 if not self.vpp_dead:
752 self.logger.debug(self.vapi.cli("show trace max 1000"))
753 self.logger.info(self.vapi.ppcli("show interface"))
754 self.logger.info(self.vapi.ppcli("show hardware"))
755 self.logger.info(self.statistics.set_errors_str())
756 self.logger.info(self.vapi.ppcli("show run"))
757 self.logger.info(self.vapi.ppcli("show log"))
758 self.logger.info(self.vapi.ppcli("show bihash"))
759 self.logger.info("Logging testcase specific show commands.")
760 self.show_commands_at_teardown()
761 if self.remove_configured_vpp_objects_on_tear_down:
762 self.registry.remove_vpp_config(self.logger)
763 # Save/Dump VPP api trace log
764 m = self._testMethodName
765 api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
766 tmp_api_trace = "/tmp/%s" % api_trace
767 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
768 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
769 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
771 os.rename(tmp_api_trace, vpp_api_trace_log)
772 except VppTransportSocketIOError:
773 self.logger.debug("VppTransportSocketIOError: Vpp dead. "
774 "Cannot log show commands.")
777 self.registry.unregister_all(self.logger)
780 """ Clear trace before running each test"""
781 super(VppTestCase, self).setUp()
782 self.reporter.send_keep_alive(self)
784 raise VppDiedError(rv=None, testcase=self.__class__.__name__,
785 method_name=self._testMethodName)
786 self.sleep(.1, "during setUp")
787 self.vpp_stdout_deque.append(
788 "--- test setUp() for %s.%s(%s) starts here ---\n" %
789 (self.__class__.__name__, self._testMethodName,
790 self._testMethodDoc))
791 self.vpp_stderr_deque.append(
792 "--- test setUp() for %s.%s(%s) starts here ---\n" %
793 (self.__class__.__name__, self._testMethodName,
794 self._testMethodDoc))
795 self.vapi.cli("clear trace")
796 # store the test instance inside the test class - so that objects
797 # holding the class can access instance methods (like assertEqual)
798 type(self).test_instance = self
801 def pg_enable_capture(cls, interfaces=None):
803 Enable capture on packet-generator interfaces
805 :param interfaces: iterable interface indexes (if None,
806 use self.pg_interfaces)
809 if interfaces is None:
810 interfaces = cls.pg_interfaces
815 def register_pcap(cls, intf, worker):
816 """ Register a pcap in the testclass """
817 # add to the list of captures with current timestamp
818 cls._pcaps.append((intf, worker))
821 def get_vpp_time(cls):
822 # processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT"
823 # returns float("2.190522")
824 timestr = cls.vapi.cli('show clock')
825 head, sep, tail = timestr.partition(',')
826 head, sep, tail = head.partition('Time now')
830 def sleep_on_vpp_time(cls, sec):
831 """ Sleep according to time in VPP world """
832 # On a busy system with many processes
833 # we might end up with VPP time being slower than real world
834 # So take that into account when waiting for VPP to do something
835 start_time = cls.get_vpp_time()
836 while cls.get_vpp_time() - start_time < sec:
840 def pg_start(cls, trace=True):
841 """ Enable the PG, wait till it is done, then clean up """
842 for (intf, worker) in cls._old_pcaps:
843 intf.handle_old_pcap_file(intf.get_in_path(worker),
844 intf.in_history_counter)
847 cls.vapi.cli("clear trace")
848 cls.vapi.cli("trace add pg-input 1000")
849 cls.vapi.cli('packet-generator enable')
850 # PG, when starts, runs to completion -
851 # so let's avoid a race condition,
852 # and wait a little till it's done.
853 # Then clean it up - and then be gone.
854 deadline = time.time() + 300
855 while cls.vapi.cli('show packet-generator').find("Yes") != -1:
856 cls.sleep(0.01) # yield
857 if time.time() > deadline:
858 cls.logger.error("Timeout waiting for pg to stop")
860 for intf, worker in cls._pcaps:
861 cls.vapi.cli('packet-generator delete %s' %
862 intf.get_cap_name(worker))
863 cls._old_pcaps = cls._pcaps
867 def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0,
870 Create packet-generator interfaces.
872 :param interfaces: iterable indexes of the interfaces.
873 :returns: List of created interfaces.
878 intf = VppPGInterface(cls, i, gso, gso_size, mode)
879 setattr(cls, intf.name, intf)
881 cls.pg_interfaces = result
885 def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
886 pgmode = VppEnum.vl_api_pg_interface_mode_t
887 return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
888 pgmode.PG_API_MODE_IP4)
891 def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
892 pgmode = VppEnum.vl_api_pg_interface_mode_t
893 return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
894 pgmode.PG_API_MODE_IP6)
897 def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
898 pgmode = VppEnum.vl_api_pg_interface_mode_t
899 return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
900 pgmode.PG_API_MODE_ETHERNET)
903 def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
904 pgmode = VppEnum.vl_api_pg_interface_mode_t
905 return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
906 pgmode.PG_API_MODE_ETHERNET)
909 def create_loopback_interfaces(cls, count):
911 Create loopback interfaces.
913 :param count: number of interfaces created.
914 :returns: List of created interfaces.
916 result = [VppLoInterface(cls) for i in range(count)]
918 setattr(cls, intf.name, intf)
919 cls.lo_interfaces = result
923 def create_bvi_interfaces(cls, count):
925 Create BVI interfaces.
927 :param count: number of interfaces created.
928 :returns: List of created interfaces.
930 result = [VppBviInterface(cls) for i in range(count)]
932 setattr(cls, intf.name, intf)
933 cls.bvi_interfaces = result
937 def extend_packet(packet, size, padding=' '):
939 Extend packet to given size by padding with spaces or custom padding
940 NOTE: Currently works only when Raw layer is present.
942 :param packet: packet
943 :param size: target size
944 :param padding: padding used to extend the payload
947 packet_len = len(packet) + 4
948 extend = size - packet_len
950 num = (extend // len(padding)) + 1
951 packet[Raw].load += (padding * num)[:extend].encode("ascii")
954 def reset_packet_infos(cls):
955 """ Reset the list of packet info objects and packet counts to zero """
956 cls._packet_infos = {}
957 cls._packet_count_for_dst_if_idx = {}
960 def create_packet_info(cls, src_if, dst_if):
962 Create packet info object containing the source and destination indexes
963 and add it to the testcase's packet info list
965 :param VppInterface src_if: source interface
966 :param VppInterface dst_if: destination interface
968 :returns: _PacketInfo object
972 info.index = len(cls._packet_infos)
973 info.src = src_if.sw_if_index
974 info.dst = dst_if.sw_if_index
975 if isinstance(dst_if, VppSubInterface):
976 dst_idx = dst_if.parent.sw_if_index
978 dst_idx = dst_if.sw_if_index
979 if dst_idx in cls._packet_count_for_dst_if_idx:
980 cls._packet_count_for_dst_if_idx[dst_idx] += 1
982 cls._packet_count_for_dst_if_idx[dst_idx] = 1
983 cls._packet_infos[info.index] = info
987 def info_to_payload(info):
989 Convert _PacketInfo object to packet payload
991 :param info: _PacketInfo object
993 :returns: string containing serialized data from packet info
996 # retrieve payload, currently 18 bytes (4 x ints + 1 short)
997 return pack('iiiih', info.index, info.src,
998 info.dst, info.ip, info.proto)
1001 def payload_to_info(payload, payload_field='load'):
1003 Convert packet payload to _PacketInfo object
1005 :param payload: packet payload
1006 :type payload: <class 'scapy.packet.Raw'>
1007 :param payload_field: packet fieldname of payload "load" for
1008 <class 'scapy.packet.Raw'>
1009 :type payload_field: str
1010 :returns: _PacketInfo object containing de-serialized data from payload
1014 # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1015 payload_b = getattr(payload, payload_field)[:18]
1017 info = _PacketInfo()
1018 info.index, info.src, info.dst, info.ip, info.proto \
1019 = unpack('iiiih', payload_b)
1021 # some SRv6 TCs depend on get an exception if bad values are detected
1022 if info.index > 0x4000:
1023 raise ValueError('Index value is invalid')
1027 def get_next_packet_info(self, info):
1029 Iterate over the packet info list stored in the testcase
1030 Start iteration with first element if info is None
1031 Continue based on index in info if info is specified
1033 :param info: info or None
1034 :returns: next info in list or None if no more infos
1039 next_index = info.index + 1
1040 if next_index == len(self._packet_infos):
1043 return self._packet_infos[next_index]
1045 def get_next_packet_info_for_interface(self, src_index, info):
1047 Search the packet info list for the next packet info with same source
1050 :param src_index: source interface index to search for
1051 :param info: packet info - where to start the search
1052 :returns: packet info or None
1056 info = self.get_next_packet_info(info)
1059 if info.src == src_index:
1062 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
1064 Search the packet info list for the next packet info with same source
1065 and destination interface indexes
1067 :param src_index: source interface index to search for
1068 :param dst_index: destination interface index to search for
1069 :param info: packet info - where to start the search
1070 :returns: packet info or None
1074 info = self.get_next_packet_info_for_interface(src_index, info)
1077 if info.dst == dst_index:
1080 def assert_equal(self, real_value, expected_value, name_or_class=None):
1081 if name_or_class is None:
1082 self.assertEqual(real_value, expected_value)
1085 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
1086 msg = msg % (getdoc(name_or_class).strip(),
1087 real_value, str(name_or_class(real_value)),
1088 expected_value, str(name_or_class(expected_value)))
1090 msg = "Invalid %s: %s does not match expected value %s" % (
1091 name_or_class, real_value, expected_value)
1093 self.assertEqual(real_value, expected_value, msg)
1095 def assert_in_range(self,
1103 msg = "Invalid %s: %s out of range <%s,%s>" % (
1104 name, real_value, expected_min, expected_max)
1105 self.assertTrue(expected_min <= real_value <= expected_max, msg)
1107 def assert_packet_checksums_valid(self, packet,
1108 ignore_zero_udp_checksums=True):
1109 received = packet.__class__(scapy.compat.raw(packet))
1110 udp_layers = ['UDP', 'UDPerror']
1111 checksum_fields = ['cksum', 'chksum']
1114 temp = received.__class__(scapy.compat.raw(received))
1116 layer = temp.getlayer(counter)
1118 layer = layer.copy()
1119 layer.remove_payload()
1120 for cf in checksum_fields:
1121 if hasattr(layer, cf):
1122 if ignore_zero_udp_checksums and \
1123 0 == getattr(layer, cf) and \
1124 layer.name in udp_layers:
1126 delattr(temp.getlayer(counter), cf)
1127 checksums.append((counter, cf))
1130 counter = counter + 1
1131 if 0 == len(checksums):
1133 temp = temp.__class__(scapy.compat.raw(temp))
1134 for layer, cf in checksums:
1135 calc_sum = getattr(temp[layer], cf)
1137 getattr(received[layer], cf), calc_sum,
1138 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
1140 "Checksum field `%s` on `%s` layer has correct value `%s`" %
1141 (cf, temp[layer].name, calc_sum))
1143 def assert_checksum_valid(self, received_packet, layer,
1144 field_name='chksum',
1145 ignore_zero_checksum=False):
1146 """ Check checksum of received packet on given layer """
1147 received_packet_checksum = getattr(received_packet[layer], field_name)
1148 if ignore_zero_checksum and 0 == received_packet_checksum:
1150 recalculated = received_packet.__class__(
1151 scapy.compat.raw(received_packet))
1152 delattr(recalculated[layer], field_name)
1153 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1154 self.assert_equal(received_packet_checksum,
1155 getattr(recalculated[layer], field_name),
1156 "packet checksum on layer: %s" % layer)
1158 def assert_ip_checksum_valid(self, received_packet,
1159 ignore_zero_checksum=False):
1160 self.assert_checksum_valid(received_packet, 'IP',
1161 ignore_zero_checksum=ignore_zero_checksum)
1163 def assert_tcp_checksum_valid(self, received_packet,
1164 ignore_zero_checksum=False):
1165 self.assert_checksum_valid(received_packet, 'TCP',
1166 ignore_zero_checksum=ignore_zero_checksum)
1168 def assert_udp_checksum_valid(self, received_packet,
1169 ignore_zero_checksum=True):
1170 self.assert_checksum_valid(received_packet, 'UDP',
1171 ignore_zero_checksum=ignore_zero_checksum)
1173 def assert_embedded_icmp_checksum_valid(self, received_packet):
1174 if received_packet.haslayer(IPerror):
1175 self.assert_checksum_valid(received_packet, 'IPerror')
1176 if received_packet.haslayer(TCPerror):
1177 self.assert_checksum_valid(received_packet, 'TCPerror')
1178 if received_packet.haslayer(UDPerror):
1179 self.assert_checksum_valid(received_packet, 'UDPerror',
1180 ignore_zero_checksum=True)
1181 if received_packet.haslayer(ICMPerror):
1182 self.assert_checksum_valid(received_packet, 'ICMPerror')
1184 def assert_icmp_checksum_valid(self, received_packet):
1185 self.assert_checksum_valid(received_packet, 'ICMP')
1186 self.assert_embedded_icmp_checksum_valid(received_packet)
1188 def assert_icmpv6_checksum_valid(self, pkt):
1189 if pkt.haslayer(ICMPv6DestUnreach):
1190 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
1191 self.assert_embedded_icmp_checksum_valid(pkt)
1192 if pkt.haslayer(ICMPv6EchoRequest):
1193 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
1194 if pkt.haslayer(ICMPv6EchoReply):
1195 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
1197 def get_counter(self, counter):
1198 if counter.startswith("/"):
1199 counter_value = self.statistics.get_counter(counter)
1201 counters = self.vapi.cli("sh errors").split('\n')
1203 for i in range(1, len(counters) - 1):
1204 results = counters[i].split()
1205 if results[1] == counter:
1206 counter_value = int(results[0])
1208 return counter_value
1210 def assert_counter_equal(self, counter, expected_value,
1211 thread=None, index=0):
1212 c = self.get_counter(counter)
1213 if thread is not None:
1214 c = c[thread][index]
1216 c = sum(x[index] for x in c)
1217 self.assert_equal(c, expected_value, "counter `%s'" % counter)
1219 def assert_packet_counter_equal(self, counter, expected_value):
1220 counter_value = self.get_counter(counter)
1221 self.assert_equal(counter_value, expected_value,
1222 "packet counter `%s'" % counter)
1224 def assert_error_counter_equal(self, counter, expected_value):
1225 counter_value = self.statistics[counter].sum()
1226 self.assert_equal(counter_value, expected_value,
1227 "error counter `%s'" % counter)
1230 def sleep(cls, timeout, remark=None):
1232 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1233 # * by Guido, only the main thread can be interrupted.
1235 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1238 if hasattr(os, 'sched_yield'):
1244 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1245 before = time.time()
1248 if after - before > 2 * timeout:
1249 cls.logger.error("unexpected self.sleep() result - "
1250 "slept for %es instead of ~%es!",
1251 after - before, timeout)
1254 "Finished sleep (%s) - slept %es (wanted %es)",
1255 remark, after - before, timeout)
1257 def virtual_sleep(self, timeout, remark=None):
1258 self.logger.debug("Moving VPP time by %s (%s)", timeout, remark)
1259 self.vapi.cli("set clock adjust %s" % timeout)
1261 def pg_send(self, intf, pkts, worker=None, trace=True):
1262 intf.add_stream(pkts, worker=worker)
1263 self.pg_enable_capture(self.pg_interfaces)
1264 self.pg_start(trace=trace)
1266 def snapshot_stats(self, stats_diff):
1267 """Return snapshot of interesting stats based on diff dictionary."""
1269 for sw_if_index in stats_diff:
1270 for counter in stats_diff[sw_if_index]:
1271 stats_snapshot[counter] = self.statistics[counter]
1272 self.logger.debug(f"Took statistics stats_snapshot: {stats_snapshot}")
1273 return stats_snapshot
1275 def compare_stats_with_snapshot(self, stats_diff, stats_snapshot):
1276 """Assert appropriate difference between current stats and snapshot."""
1277 for sw_if_index in stats_diff:
1278 for cntr, diff in stats_diff[sw_if_index].items():
1279 if sw_if_index == "err":
1281 self.statistics[cntr].sum(),
1282 stats_snapshot[cntr].sum() + diff,
1283 f"'{cntr}' counter value (previous value: "
1284 f"{stats_snapshot[cntr].sum()}, "
1285 f"expected diff: {diff})")
1289 self.statistics[cntr][:, sw_if_index].sum(),
1290 stats_snapshot[cntr][:, sw_if_index].sum() + diff,
1291 f"'{cntr}' counter value (previous value: "
1292 f"{stats_snapshot[cntr][:, sw_if_index].sum()}, "
1293 f"expected diff: {diff})")
1295 # if diff is 0, then this most probably a case where
1296 # test declares multiple interfaces but traffic hasn't
1297 # passed through this one yet - which means the counter
1298 # value is 0 and can be ignored
1302 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None,
1303 stats_diff=None, trace=True, msg=None):
1305 stats_snapshot = self.snapshot_stats(stats_diff)
1307 self.pg_send(intf, pkts)
1312 for i in self.pg_interfaces:
1313 i.get_capture(0, timeout=timeout)
1314 i.assert_nothing_captured(remark=remark)
1319 self.logger.debug(f"send_and_assert_no_replies: {msg}")
1320 self.logger.debug(self.vapi.cli("show trace"))
1323 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1325 def send_and_expect(self, intf, pkts, output, n_rx=None, worker=None,
1326 trace=True, msg=None, stats_diff=None):
1328 stats_snapshot = self.snapshot_stats(stats_diff)
1331 n_rx = 1 if isinstance(pkts, Packet) else len(pkts)
1332 self.pg_send(intf, pkts, worker=worker, trace=trace)
1333 rx = output.get_capture(n_rx)
1336 self.logger.debug(f"send_and_expect: {msg}")
1337 self.logger.debug(self.vapi.cli("show trace"))
1340 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1344 def send_and_expect_load_balancing(self, input, pkts, outputs,
1345 worker=None, trace=True):
1346 self.pg_send(input, pkts, worker=worker, trace=trace)
1349 rx = oo._get_capture(1)
1350 self.assertNotEqual(0, len(rx))
1353 self.logger.debug(self.vapi.cli("show trace"))
1356 def send_and_expect_only(self, intf, pkts, output, timeout=None,
1359 stats_snapshot = self.snapshot_stats(stats_diff)
1361 self.pg_send(intf, pkts)
1362 rx = output.get_capture(len(pkts))
1366 for i in self.pg_interfaces:
1367 if i not in outputs:
1368 i.get_capture(0, timeout=timeout)
1369 i.assert_nothing_captured()
1373 self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
1378 def get_testcase_doc_name(test):
1379 return getdoc(test.__class__).splitlines()[0]
1382 def get_test_description(descriptions, test):
1383 short_description = test.shortDescription()
1384 if descriptions and short_description:
1385 return short_description
1390 class TestCaseInfo(object):
1391 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1392 self.logger = logger
1393 self.tempdir = tempdir
1394 self.vpp_pid = vpp_pid
1395 self.vpp_bin_path = vpp_bin_path
1396 self.core_crash_test = None
1399 class VppTestResult(unittest.TestResult):
1401 @property result_string
1402 String variable to store the test case result string.
1404 List variable containing 2-tuples of TestCase instances and strings
1405 holding formatted tracebacks. Each tuple represents a test which
1406 raised an unexpected exception.
1408 List variable containing 2-tuples of TestCase instances and strings
1409 holding formatted tracebacks. Each tuple represents a test where
1410 a failure was explicitly signalled using the TestCase.assert*()
1414 failed_test_cases_info = set()
1415 core_crash_test_cases_info = set()
1416 current_test_case_info = None
1418 def __init__(self, stream=None, descriptions=None, verbosity=None,
1421 :param stream File descriptor to store where to report test results.
1422 Set to the standard error stream by default.
1423 :param descriptions Boolean variable to store information if to use
1424 test case descriptions.
1425 :param verbosity Integer variable to store required verbosity level.
1427 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1428 self.stream = stream
1429 self.descriptions = descriptions
1430 self.verbosity = verbosity
1431 self.result_string = None
1432 self.runner = runner
1435 def addSuccess(self, test):
1437 Record a test succeeded result
1442 if self.current_test_case_info:
1443 self.current_test_case_info.logger.debug(
1444 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1445 test._testMethodName,
1446 test._testMethodDoc))
1447 unittest.TestResult.addSuccess(self, test)
1448 self.result_string = colorize("OK", GREEN)
1450 self.send_result_through_pipe(test, PASS)
1452 def addSkip(self, test, reason):
1454 Record a test skipped.
1460 if self.current_test_case_info:
1461 self.current_test_case_info.logger.debug(
1462 "--- addSkip() %s.%s(%s) called, reason is %s" %
1463 (test.__class__.__name__, test._testMethodName,
1464 test._testMethodDoc, reason))
1465 unittest.TestResult.addSkip(self, test, reason)
1466 self.result_string = colorize("SKIP", YELLOW)
1468 if reason == "not enough cpus":
1469 self.send_result_through_pipe(test, SKIP_CPU_SHORTAGE)
1471 self.send_result_through_pipe(test, SKIP)
1473 def symlink_failed(self):
1474 if self.current_test_case_info:
1476 failed_dir = config.failed_dir
1477 link_path = os.path.join(
1480 os.path.basename(self.current_test_case_info.tempdir))
1482 self.current_test_case_info.logger.debug(
1483 "creating a link to the failed test")
1484 self.current_test_case_info.logger.debug(
1485 "os.symlink(%s, %s)" %
1486 (self.current_test_case_info.tempdir, link_path))
1487 if os.path.exists(link_path):
1488 self.current_test_case_info.logger.debug(
1489 'symlink already exists')
1491 os.symlink(self.current_test_case_info.tempdir, link_path)
1493 except Exception as e:
1494 self.current_test_case_info.logger.error(e)
1496 def send_result_through_pipe(self, test, result):
1497 if hasattr(self, 'test_framework_result_pipe'):
1498 pipe = self.test_framework_result_pipe
1500 pipe.send((test.id(), result))
1502 def log_error(self, test, err, fn_name):
1503 if self.current_test_case_info:
1504 if isinstance(test, unittest.suite._ErrorHolder):
1505 test_name = test.description
1507 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1508 test._testMethodName,
1509 test._testMethodDoc)
1510 self.current_test_case_info.logger.debug(
1511 "--- %s() %s called, err is %s" %
1512 (fn_name, test_name, err))
1513 self.current_test_case_info.logger.debug(
1514 "formatted exception is:\n%s" %
1515 "".join(format_exception(*err)))
1517 def add_error(self, test, err, unittest_fn, error_type):
1518 if error_type == FAIL:
1519 self.log_error(test, err, 'addFailure')
1520 error_type_str = colorize("FAIL", RED)
1521 elif error_type == ERROR:
1522 self.log_error(test, err, 'addError')
1523 error_type_str = colorize("ERROR", RED)
1525 raise Exception('Error type %s cannot be used to record an '
1526 'error or a failure' % error_type)
1528 unittest_fn(self, test, err)
1529 if self.current_test_case_info:
1530 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1532 self.current_test_case_info.tempdir)
1533 self.symlink_failed()
1534 self.failed_test_cases_info.add(self.current_test_case_info)
1535 if is_core_present(self.current_test_case_info.tempdir):
1536 if not self.current_test_case_info.core_crash_test:
1537 if isinstance(test, unittest.suite._ErrorHolder):
1538 test_name = str(test)
1540 test_name = "'{!s}' ({!s})".format(
1541 get_testcase_doc_name(test), test.id())
1542 self.current_test_case_info.core_crash_test = test_name
1543 self.core_crash_test_cases_info.add(
1544 self.current_test_case_info)
1546 self.result_string = '%s [no temp dir]' % error_type_str
1548 self.send_result_through_pipe(test, error_type)
1550 def addFailure(self, test, err):
1552 Record a test failed result
1555 :param err: error message
1558 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1560 def addError(self, test, err):
1562 Record a test error result
1565 :param err: error message
1568 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1570 def getDescription(self, test):
1572 Get test description
1575 :returns: test description
1578 return get_test_description(self.descriptions, test)
1580 def startTest(self, test):
1588 def print_header(test):
1589 if test.__class__ in self.printed:
1592 test_doc = getdoc(test)
1594 raise Exception("No doc string for test '%s'" % test.id())
1596 test_title = test_doc.splitlines()[0].rstrip()
1597 test_title = colorize(test_title, GREEN)
1598 if test.is_tagged_run_solo():
1599 test_title = colorize(f"SOLO RUN: {test_title}", YELLOW)
1601 # This block may overwrite the colorized title above,
1602 # but we want this to stand out and be fixed
1603 if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
1604 test_title = colorize(
1605 f"FIXME with VPP workers: {test_title}", RED)
1607 if test.has_tag(TestCaseTag.FIXME_ASAN):
1608 test_title = colorize(
1609 f"FIXME with ASAN: {test_title}", RED)
1610 test.skip_fixme_asan()
1612 if hasattr(test, 'vpp_worker_count'):
1613 if test.vpp_worker_count == 0:
1614 test_title += " [main thread only]"
1615 elif test.vpp_worker_count == 1:
1616 test_title += " [1 worker thread]"
1618 test_title += f" [{test.vpp_worker_count} worker threads]"
1620 if test.__class__.skipped_due_to_cpu_lack:
1621 test_title = colorize(
1622 f"{test_title} [skipped - not enough cpus, "
1623 f"required={test.__class__.get_cpus_required()}, "
1624 f"available={max_vpp_cpus}]", YELLOW)
1626 print(double_line_delim)
1628 print(double_line_delim)
1629 self.printed.append(test.__class__)
1632 self.start_test = time.time()
1633 unittest.TestResult.startTest(self, test)
1634 if self.verbosity > 0:
1635 self.stream.writeln(
1636 "Starting " + self.getDescription(test) + " ...")
1637 self.stream.writeln(single_line_delim)
1639 def stopTest(self, test):
1641 Called when the given test has been run
1646 unittest.TestResult.stopTest(self, test)
1648 if self.verbosity > 0:
1649 self.stream.writeln(single_line_delim)
1650 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1651 self.result_string))
1652 self.stream.writeln(single_line_delim)
1654 self.stream.writeln("%-68s %4.2f %s" %
1655 (self.getDescription(test),
1656 time.time() - self.start_test,
1657 self.result_string))
1659 self.send_result_through_pipe(test, TEST_RUN)
1661 def printErrors(self):
1663 Print errors from running the test case
1665 if len(self.errors) > 0 or len(self.failures) > 0:
1666 self.stream.writeln()
1667 self.printErrorList('ERROR', self.errors)
1668 self.printErrorList('FAIL', self.failures)
1670 # ^^ that is the last output from unittest before summary
1671 if not self.runner.print_summary:
1672 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1673 self.stream = devnull
1674 self.runner.stream = devnull
1676 def printErrorList(self, flavour, errors):
1678 Print error list to the output stream together with error type
1679 and test case description.
1681 :param flavour: error type
1682 :param errors: iterable errors
1685 for test, err in errors:
1686 self.stream.writeln(double_line_delim)
1687 self.stream.writeln("%s: %s" %
1688 (flavour, self.getDescription(test)))
1689 self.stream.writeln(single_line_delim)
1690 self.stream.writeln("%s" % err)
1693 class VppTestRunner(unittest.TextTestRunner):
1695 A basic test runner implementation which prints results to standard error.
1699 def resultclass(self):
1700 """Class maintaining the results of the tests"""
1701 return VppTestResult
1703 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1704 result_pipe=None, failfast=False, buffer=False,
1705 resultclass=None, print_summary=True, **kwargs):
1706 # ignore stream setting here, use hard-coded stdout to be in sync
1707 # with prints from VppTestCase methods ...
1708 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1709 verbosity, failfast, buffer,
1710 resultclass, **kwargs)
1711 KeepAliveReporter.pipe = keep_alive_pipe
1713 self.orig_stream = self.stream
1714 self.resultclass.test_framework_result_pipe = result_pipe
1716 self.print_summary = print_summary
1718 def _makeResult(self):
1719 return self.resultclass(self.stream,
1724 def run(self, test):
1731 faulthandler.enable() # emit stack trace to stderr if killed by signal
1733 result = super(VppTestRunner, self).run(test)
1734 if not self.print_summary:
1735 self.stream = self.orig_stream
1736 result.stream = self.orig_stream
1740 class Worker(Thread):
1741 def __init__(self, executable_args, logger, env=None, *args, **kwargs):
1742 super(Worker, self).__init__(*args, **kwargs)
1743 self.logger = logger
1744 self.args = executable_args
1745 if hasattr(self, 'testcase') and self.testcase.debug_all:
1746 if self.testcase.debug_gdbserver:
1747 self.args = ['/usr/bin/gdbserver', 'localhost:{port}'
1748 .format(port=self.testcase.gdbserver_port)] + args
1749 elif self.testcase.debug_gdb and hasattr(self, 'wait_for_gdb'):
1750 self.args.append(self.wait_for_gdb)
1751 self.app_bin = executable_args[0]
1752 self.app_name = os.path.basename(self.app_bin)
1753 if hasattr(self, 'role'):
1754 self.app_name += ' {role}'.format(role=self.role)
1757 env = {} if env is None else env
1758 self.env = copy.deepcopy(env)
1760 def wait_for_enter(self):
1761 if not hasattr(self, 'testcase'):
1763 if self.testcase.debug_all and self.testcase.debug_gdbserver:
1765 print(double_line_delim)
1766 print("Spawned GDB Server for '{app}' with PID: {pid}"
1767 .format(app=self.app_name, pid=self.process.pid))
1768 elif self.testcase.debug_all and self.testcase.debug_gdb:
1770 print(double_line_delim)
1771 print("Spawned '{app}' with PID: {pid}"
1772 .format(app=self.app_name, pid=self.process.pid))
1775 print(single_line_delim)
1776 print("You can debug '{app}' using:".format(app=self.app_name))
1777 if self.testcase.debug_gdbserver:
1778 print("sudo gdb " + self.app_bin +
1779 " -ex 'target remote localhost:{port}'"
1780 .format(port=self.testcase.gdbserver_port))
1781 print("Now is the time to attach gdb by running the above "
1782 "command, set up breakpoints etc., then resume from "
1783 "within gdb by issuing the 'continue' command")
1784 self.testcase.gdbserver_port += 1
1785 elif self.testcase.debug_gdb:
1786 print("sudo gdb " + self.app_bin +
1787 " -ex 'attach {pid}'".format(pid=self.process.pid))
1788 print("Now is the time to attach gdb by running the above "
1789 "command and set up breakpoints etc., then resume from"
1790 " within gdb by issuing the 'continue' command")
1791 print(single_line_delim)
1792 input("Press ENTER to continue running the testcase...")
1795 executable = self.args[0]
1796 if not os.path.exists(executable) or not os.access(
1797 executable, os.F_OK | os.X_OK):
1798 # Exit code that means some system file did not exist,
1799 # could not be opened, or had some other kind of error.
1800 self.result = os.EX_OSFILE
1801 raise EnvironmentError(
1802 "executable '%s' is not found or executable." % executable)
1803 self.logger.debug("Running executable '{app}': '{cmd}'"
1804 .format(app=self.app_name,
1805 cmd=' '.join(self.args)))
1806 env = os.environ.copy()
1807 env.update(self.env)
1808 env["CK_LOG_FILE_NAME"] = "-"
1809 self.process = subprocess.Popen(
1810 ['stdbuf', '-o0', '-e0'] + self.args, shell=False, env=env,
1811 preexec_fn=os.setpgrp, stdout=subprocess.PIPE,
1812 stderr=subprocess.PIPE)
1813 self.wait_for_enter()
1814 out, err = self.process.communicate()
1815 self.logger.debug("Finished running `{app}'".format(app=self.app_name))
1816 self.logger.info("Return code is `%s'" % self.process.returncode)
1817 self.logger.info(single_line_delim)
1818 self.logger.info("Executable `{app}' wrote to stdout:"
1819 .format(app=self.app_name))
1820 self.logger.info(single_line_delim)
1821 self.logger.info(out.decode('utf-8'))
1822 self.logger.info(single_line_delim)
1823 self.logger.info("Executable `{app}' wrote to stderr:"
1824 .format(app=self.app_name))
1825 self.logger.info(single_line_delim)
1826 self.logger.info(err.decode('utf-8'))
1827 self.logger.info(single_line_delim)
1828 self.result = self.process.returncode
1831 if __name__ == '__main__':