3 from __future__ import print_function
19 from collections import deque
20 from threading import Thread, Event
21 from inspect import getdoc, isclass
22 from traceback import format_exception
23 from logging import FileHandler, DEBUG, Formatter
25 from abc import ABC, abstractmethod
26 from struct import pack, unpack
29 from scapy.packet import Raw, Packet
30 from config import config, available_cpus, num_cpus, max_vpp_cpus
31 import hook as hookmodule
32 from vpp_pg_interface import VppPGInterface
33 from vpp_sub_interface import VppSubInterface
34 from vpp_lo_interface import VppLoInterface
35 from vpp_bvi_interface import VppBviInterface
36 from vpp_papi_provider import VppPapiProvider
37 from vpp_papi import VppEnum
39 from vpp_papi.vpp_stats import VPPStats
40 from vpp_papi.vpp_transport_socket import VppTransportSocketIOError
41 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
43 from vpp_object import VppObjectRegistry
44 from util import ppp, is_core_present
45 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
46 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
47 from scapy.layers.inet6 import ICMPv6EchoReply
50 logger = logging.getLogger(__name__)
52 # Set up an empty logger for the testcase that can be overridden as necessary
53 null_logger = logging.getLogger('VppTestCase')
54 null_logger.addHandler(logging.NullHandler())
64 if config.debug_framework:
68 Test framework module.
70 The module provides a set of tools for constructing and running tests and
71 representing the results.
75 class VppDiedError(Exception):
76 """ exception for reporting that the subprocess has died."""
78 signals_by_value = {v: k for k, v in signal.__dict__.items() if
79 k.startswith('SIG') and not k.startswith('SIG_')}
81 def __init__(self, rv=None, testcase=None, method_name=None):
83 self.signal_name = None
84 self.testcase = testcase
85 self.method_name = method_name
88 self.signal_name = VppDiedError.signals_by_value[-rv]
89 except (KeyError, TypeError):
92 if testcase is None and method_name is None:
95 in_msg = ' while running %s.%s' % (testcase, method_name)
98 msg = "VPP subprocess died unexpectedly%s with return code: %d%s."\
99 % (in_msg, self.rv, ' [%s]' %
101 self.signal_name is not None else ''))
103 msg = "VPP subprocess died unexpectedly%s." % in_msg
105 super(VppDiedError, self).__init__(msg)
108 class _PacketInfo(object):
109 """Private class to create packet info object.
111 Help process information about the next packet.
112 Set variables to default values.
114 #: Store the index of the packet.
116 #: Store the index of the source packet generator interface of the packet.
118 #: Store the index of the destination packet generator interface
121 #: Store expected ip version
123 #: Store expected upper protocol
125 #: Store the copy of the former packet.
128 def __eq__(self, other):
129 index = self.index == other.index
130 src = self.src == other.src
131 dst = self.dst == other.dst
132 data = self.data == other.data
133 return index and src and dst and data
136 def pump_output(testclass):
137 """ pump output from vpp stdout/stderr to proper queues """
140 while not testclass.pump_thread_stop_flag.is_set():
141 readable = select.select([testclass.vpp.stdout.fileno(),
142 testclass.vpp.stderr.fileno(),
143 testclass.pump_thread_wakeup_pipe[0]],
145 if testclass.vpp.stdout.fileno() in readable:
146 read = os.read(testclass.vpp.stdout.fileno(), 102400)
148 split = read.decode('ascii',
149 errors='backslashreplace').splitlines(True)
150 if len(stdout_fragment) > 0:
151 split[0] = "%s%s" % (stdout_fragment, split[0])
152 if len(split) > 0 and split[-1].endswith("\n"):
156 stdout_fragment = split[-1]
157 testclass.vpp_stdout_deque.extend(split[:limit])
158 if not config.cache_vpp_output:
159 for line in split[:limit]:
160 testclass.logger.info(
161 "VPP STDOUT: %s" % line.rstrip("\n"))
162 if testclass.vpp.stderr.fileno() in readable:
163 read = os.read(testclass.vpp.stderr.fileno(), 102400)
165 split = read.decode('ascii',
166 errors='backslashreplace').splitlines(True)
167 if len(stderr_fragment) > 0:
168 split[0] = "%s%s" % (stderr_fragment, split[0])
169 if len(split) > 0 and split[-1].endswith("\n"):
173 stderr_fragment = split[-1]
175 testclass.vpp_stderr_deque.extend(split[:limit])
176 if not config.cache_vpp_output:
177 for line in split[:limit]:
178 testclass.logger.error(
179 "VPP STDERR: %s" % line.rstrip("\n"))
180 # ignoring the dummy pipe here intentionally - the
181 # flag will take care of properly terminating the loop
184 def _is_platform_aarch64():
185 return platform.machine() == 'aarch64'
188 is_platform_aarch64 = _is_platform_aarch64()
191 class KeepAliveReporter(object):
193 Singleton object which reports test start to parent process
198 self.__dict__ = self._shared_state
206 def pipe(self, pipe):
207 if self._pipe is not None:
208 raise Exception("Internal error - pipe should only be set once.")
211 def send_keep_alive(self, test, desc=None):
213 Write current test tmpdir & desc to keep-alive pipe to signal liveness
215 if self.pipe is None:
216 # if not running forked..
220 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
224 self.pipe.send((desc, config.vpp, test.tempdir, test.vpp.pid))
227 class TestCaseTag(Enum):
228 # marks the suites that must run at the end
229 # using only a single test runner
231 # marks the suites broken on VPP multi-worker
232 FIXME_VPP_WORKERS = 2
233 # marks the suites broken when ASan is enabled
237 def create_tag_decorator(e):
240 cls.test_tags.append(e)
241 except AttributeError:
247 tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
248 tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
249 tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN)
263 class CPUInterface(ABC):
265 skipped_due_to_cpu_lack = False
269 def get_cpus_required(cls):
273 def assign_cpus(cls, cpus):
277 class VppTestCase(CPUInterface, unittest.TestCase):
278 """This subclass is a base class for VPP test cases that are implemented as
279 classes. It provides methods to create and run test case.
282 extra_vpp_statseg_config = ""
283 extra_vpp_punt_config = []
284 extra_vpp_plugin_config = []
286 vapi_response_timeout = 5
289 def packet_infos(self):
290 """List of packet infos"""
291 return self._packet_infos
294 def get_packet_count_for_if_idx(cls, dst_if_index):
295 """Get the number of packet info for specified destination if index"""
296 if dst_if_index in cls._packet_count_for_dst_if_idx:
297 return cls._packet_count_for_dst_if_idx[dst_if_index]
302 def has_tag(cls, tag):
303 """ if the test case has a given tag - return true """
305 return tag in cls.test_tags
306 except AttributeError:
311 def is_tagged_run_solo(cls):
312 """ if the test case class is timing-sensitive - return true """
313 return cls.has_tag(TestCaseTag.RUN_SOLO)
316 def skip_fixme_asan(cls):
317 """ if @tag_fixme_asan & ASan is enabled - mark for skip """
318 if cls.has_tag(TestCaseTag.FIXME_ASAN):
319 vpp_extra_cmake_args = os.environ.get('VPP_EXTRA_CMAKE_ARGS', '')
320 if 'DVPP_ENABLE_SANITIZE_ADDR=ON' in vpp_extra_cmake_args:
321 cls = unittest.skip("Skipping @tag_fixme_asan tests")(cls)
325 """Return the instance of this testcase"""
326 return cls.test_instance
329 def set_debug_flags(cls, d):
330 cls.gdbserver_port = 7777
331 cls.debug_core = False
332 cls.debug_gdb = False
333 cls.debug_gdbserver = False
334 cls.debug_all = False
335 cls.debug_attach = False
340 cls.debug_core = True
341 elif dl == "gdb" or dl == "gdb-all":
343 elif dl == "gdbserver" or dl == "gdbserver-all":
344 cls.debug_gdbserver = True
346 cls.debug_attach = True
348 raise Exception("Unrecognized DEBUG option: '%s'" % d)
349 if dl == "gdb-all" or dl == "gdbserver-all":
353 def get_vpp_worker_count(cls):
354 if not hasattr(cls, "vpp_worker_count"):
355 if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
356 cls.vpp_worker_count = 0
358 cls.vpp_worker_count = config.vpp_worker_count
359 return cls.vpp_worker_count
362 def get_cpus_required(cls):
363 return 1 + cls.get_vpp_worker_count()
366 def setUpConstants(cls):
367 """ Set-up the test case class based on environment variables """
368 cls.step = config.step
369 cls.plugin_path = ":".join(config.vpp_plugin_dir)
370 cls.test_plugin_path = ":".join(config.vpp_test_plugin_dir)
371 cls.extern_plugin_path = ":".join(config.extern_plugin_dir)
373 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
374 debug_cli = "cli-listen localhost:5002"
375 size = re.search(r"\d+[gG]", config.coredump_size)
377 coredump_size = f"coredump-size {config.coredump_size}".lower()
379 coredump_size = "coredump-size unlimited"
380 default_variant = config.variant
381 if default_variant is not None:
382 default_variant = "defaults { %s 100 }" % default_variant
386 api_fuzzing = config.api_fuzz
387 if api_fuzzing is None:
392 "unix", "{", "nodaemon", debug_cli, "full-coredump",
393 coredump_size, "runtime-dir", cls.tempdir, "}",
394 "api-trace", "{", "on", "}",
395 "api-segment", "{", "prefix", cls.get_api_segment_prefix(), "}",
396 "cpu", "{", "main-core", str(cls.cpus[0]), ]
397 if cls.extern_plugin_path not in (None, ""):
398 cls.extra_vpp_plugin_config.append(
399 "add-path %s" % cls.extern_plugin_path)
400 if cls.get_vpp_worker_count():
401 cls.vpp_cmdline.extend([
402 "corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])])
403 cls.vpp_cmdline.extend([
405 "physmem", "{", "max-size", "32m", "}",
406 "statseg", "{", "socket-name", cls.get_stats_sock_path(),
407 cls.extra_vpp_statseg_config, "}",
408 "socksvr", "{", "socket-name", cls.get_api_sock_path(), "}",
409 "node { ", default_variant, "}",
410 "api-fuzz {", api_fuzzing, "}",
411 "plugins", "{", "plugin", "dpdk_plugin.so", "{", "disable", "}",
412 "plugin", "rdma_plugin.so", "{", "disable", "}",
413 "plugin", "lisp_unittest_plugin.so", "{", "enable", "}",
414 "plugin", "unittest_plugin.so", "{", "enable", "}"
415 ] + cls.extra_vpp_plugin_config + ["}", ])
417 if cls.extra_vpp_punt_config is not None:
418 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
420 if not cls.debug_attach:
421 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
422 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
425 def wait_for_enter(cls):
426 if cls.debug_gdbserver:
427 print(double_line_delim)
428 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
430 print(double_line_delim)
431 print("Spawned VPP with PID: %d" % cls.vpp.pid)
433 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
435 print(single_line_delim)
436 print("You can debug VPP using:")
437 if cls.debug_gdbserver:
438 print(f"sudo gdb {config.vpp} "
439 f"-ex 'target remote localhost:{cls.gdbserver_port}'")
440 print("Now is the time to attach gdb by running the above "
441 "command, set up breakpoints etc., then resume VPP from "
442 "within gdb by issuing the 'continue' command")
443 cls.gdbserver_port += 1
445 print(f"sudo gdb {config.vpp} -ex 'attach {cls.vpp.pid}'")
446 print("Now is the time to attach gdb by running the above "
447 "command and set up breakpoints etc., then resume VPP from"
448 " within gdb by issuing the 'continue' command")
449 print(single_line_delim)
450 input("Press ENTER to continue running the testcase...")
458 cls.logger.debug(f"Assigned cpus: {cls.cpus}")
459 cmdline = cls.vpp_cmdline
461 if cls.debug_gdbserver:
462 gdbserver = '/usr/bin/gdbserver'
463 if not os.path.isfile(gdbserver) or\
464 not os.access(gdbserver, os.X_OK):
465 raise Exception("gdbserver binary '%s' does not exist or is "
466 "not executable" % gdbserver)
468 cmdline = [gdbserver, 'localhost:{port}'
469 .format(port=cls.gdbserver_port)] + cls.vpp_cmdline
470 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
473 cls.vpp = subprocess.Popen(cmdline,
474 stdout=subprocess.PIPE,
475 stderr=subprocess.PIPE)
476 except subprocess.CalledProcessError as e:
477 cls.logger.critical("Subprocess returned with non-0 return code: ("
481 cls.logger.critical("Subprocess returned with OS error: "
482 "(%s) %s", e.errno, e.strerror)
484 except Exception as e:
485 cls.logger.exception("Subprocess returned unexpected from "
492 def wait_for_coredump(cls):
493 corefile = cls.tempdir + "/core"
494 if os.path.isfile(corefile):
495 cls.logger.error("Waiting for coredump to complete: %s", corefile)
496 curr_size = os.path.getsize(corefile)
497 deadline = time.time() + 60
499 while time.time() < deadline:
502 curr_size = os.path.getsize(corefile)
503 if size == curr_size:
507 cls.logger.error("Timed out waiting for coredump to complete:"
510 cls.logger.error("Coredump complete: %s, size %d",
514 def get_stats_sock_path(cls):
515 return "%s/stats.sock" % cls.tempdir
518 def get_api_sock_path(cls):
519 return "%s/api.sock" % cls.tempdir
522 def get_api_segment_prefix(cls):
523 return os.path.basename(cls.tempdir) # Only used for VAPI
526 def get_tempdir(cls):
527 tmpdir = f"{config.tmp_dir}/vpp-unittest-{cls.__name__}"
528 if config.wipe_tmp_dir:
529 shutil.rmtree(tmpdir, ignore_errors=True)
534 def create_file_handler(cls):
535 if config.log_dir is None:
536 cls.file_handler = FileHandler(f"{cls.tempdir}/log.txt")
539 logdir = f"{config.log_dir}/vpp-unittest-{cls.__name__}"
540 if config.wipe_tmp_dir:
541 shutil.rmtree(logdir, ignore_errors=True)
543 cls.file_handler = FileHandler(f"{logdir}/log.txt")
548 Perform class setup before running the testcase
549 Remove shared memory files, start vpp and connect the vpp-api
551 super(VppTestCase, cls).setUpClass()
552 cls.logger = get_logger(cls.__name__)
553 random.seed(config.rnd_seed)
554 if hasattr(cls, 'parallel_handler'):
555 cls.logger.addHandler(cls.parallel_handler)
556 cls.logger.propagate = False
557 cls.set_debug_flags(config.debug)
558 cls.tempdir = cls.get_tempdir()
559 cls.create_file_handler()
560 cls.file_handler.setFormatter(
561 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
563 cls.file_handler.setLevel(DEBUG)
564 cls.logger.addHandler(cls.file_handler)
565 cls.logger.debug("--- setUpClass() for %s called ---" % cls.__name__)
566 os.chdir(cls.tempdir)
567 cls.logger.info("Temporary dir is %s, api socket is %s",
568 cls.tempdir, cls.get_api_sock_path())
569 cls.logger.debug("Random seed is %s", config.rnd_seed)
571 cls.reset_packet_infos()
576 cls.registry = VppObjectRegistry()
577 cls.vpp_startup_failed = False
578 cls.reporter = KeepAliveReporter()
579 # need to catch exceptions here because if we raise, then the cleanup
580 # doesn't get called and we might end with a zombie vpp
586 cls.reporter.send_keep_alive(cls, 'setUpClass')
587 VppTestResult.current_test_case_info = TestCaseInfo(
588 cls.logger, cls.tempdir, cls.vpp.pid, config.vpp)
589 cls.vpp_stdout_deque = deque()
590 cls.vpp_stderr_deque = deque()
591 if not cls.debug_attach:
592 cls.pump_thread_stop_flag = Event()
593 cls.pump_thread_wakeup_pipe = os.pipe()
594 cls.pump_thread = Thread(target=pump_output, args=(cls,))
595 cls.pump_thread.daemon = True
596 cls.pump_thread.start()
597 if cls.debug_gdb or cls.debug_gdbserver or cls.debug_attach:
598 cls.vapi_response_timeout = 0
599 cls.vapi = VppPapiProvider(cls.__name__, cls,
600 cls.vapi_response_timeout)
602 hook = hookmodule.StepHook(cls)
604 hook = hookmodule.PollHook(cls)
605 cls.vapi.register_hook(hook)
606 cls.statistics = VPPStats(socketname=cls.get_stats_sock_path())
610 cls.vpp_startup_failed = True
612 "VPP died shortly after startup, check the"
613 " output to standard error for possible cause")
617 except (vpp_papi.VPPIOError, Exception) as e:
618 cls.logger.debug("Exception connecting to vapi: %s" % e)
619 cls.vapi.disconnect()
621 if cls.debug_gdbserver:
622 print(colorize("You're running VPP inside gdbserver but "
623 "VPP-API connection failed, did you forget "
624 "to 'continue' VPP from within gdb?", RED))
627 last_line = cls.vapi.cli("show thread").split("\n")[-2]
628 cls.vpp_worker_count = int(last_line.split(" ")[0])
629 print("Detected VPP with %s workers." % cls.vpp_worker_count)
630 except vpp_papi.VPPRuntimeError as e:
631 cls.logger.debug("%s" % e)
634 except Exception as e:
635 cls.logger.debug("Exception connecting to VPP: %s" % e)
640 def _debug_quit(cls):
641 if (cls.debug_gdbserver or cls.debug_gdb):
645 if cls.vpp.returncode is None:
647 print(double_line_delim)
648 print("VPP or GDB server is still running")
649 print(single_line_delim)
650 input("When done debugging, press ENTER to kill the "
651 "process and finish running the testcase...")
652 except AttributeError:
658 Disconnect vpp-api, kill vpp and cleanup shared memory files
662 # first signal that we want to stop the pump thread, then wake it up
663 if hasattr(cls, 'pump_thread_stop_flag'):
664 cls.pump_thread_stop_flag.set()
665 if hasattr(cls, 'pump_thread_wakeup_pipe'):
666 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
667 if hasattr(cls, 'pump_thread'):
668 cls.logger.debug("Waiting for pump thread to stop")
669 cls.pump_thread.join()
670 if hasattr(cls, 'vpp_stderr_reader_thread'):
671 cls.logger.debug("Waiting for stderr pump to stop")
672 cls.vpp_stderr_reader_thread.join()
674 if hasattr(cls, 'vpp'):
675 if hasattr(cls, 'vapi'):
676 cls.logger.debug(cls.vapi.vpp.get_stats())
677 cls.logger.debug("Disconnecting class vapi client on %s",
679 cls.vapi.disconnect()
680 cls.logger.debug("Deleting class vapi attribute on %s",
684 if not cls.debug_attach and cls.vpp.returncode is None:
685 cls.wait_for_coredump()
686 cls.logger.debug("Sending TERM to vpp")
688 cls.logger.debug("Waiting for vpp to die")
690 outs, errs = cls.vpp.communicate(timeout=5)
691 except subprocess.TimeoutExpired:
693 outs, errs = cls.vpp.communicate()
694 cls.logger.debug("Deleting class vpp attribute on %s",
696 if not cls.debug_attach:
697 cls.vpp.stdout.close()
698 cls.vpp.stderr.close()
701 if cls.vpp_startup_failed:
702 stdout_log = cls.logger.info
703 stderr_log = cls.logger.critical
705 stdout_log = cls.logger.info
706 stderr_log = cls.logger.info
708 if hasattr(cls, 'vpp_stdout_deque'):
709 stdout_log(single_line_delim)
710 stdout_log('VPP output to stdout while running %s:', cls.__name__)
711 stdout_log(single_line_delim)
712 vpp_output = "".join(cls.vpp_stdout_deque)
713 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
715 stdout_log('\n%s', vpp_output)
716 stdout_log(single_line_delim)
718 if hasattr(cls, 'vpp_stderr_deque'):
719 stderr_log(single_line_delim)
720 stderr_log('VPP output to stderr while running %s:', cls.__name__)
721 stderr_log(single_line_delim)
722 vpp_output = "".join(cls.vpp_stderr_deque)
723 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
725 stderr_log('\n%s', vpp_output)
726 stderr_log(single_line_delim)
729 def tearDownClass(cls):
730 """ Perform final cleanup after running all tests in this test-case """
731 cls.logger.debug("--- tearDownClass() for %s called ---" %
733 cls.reporter.send_keep_alive(cls, 'tearDownClass')
735 cls.file_handler.close()
736 cls.reset_packet_infos()
737 if config.debug_framework:
738 debug_internal.on_tear_down_class(cls)
740 def show_commands_at_teardown(self):
741 """ Allow subclass specific teardown logging additions."""
742 self.logger.info("--- No test specific show commands provided. ---")
745 """ Show various debug prints after each test """
746 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
747 (self.__class__.__name__, self._testMethodName,
748 self._testMethodDoc))
751 if not self.vpp_dead:
752 self.logger.debug(self.vapi.cli("show trace max 1000"))
753 self.logger.info(self.vapi.ppcli("show interface"))
754 self.logger.info(self.vapi.ppcli("show hardware"))
755 self.logger.info(self.statistics.set_errors_str())
756 self.logger.info(self.vapi.ppcli("show run"))
757 self.logger.info(self.vapi.ppcli("show log"))
758 self.logger.info(self.vapi.ppcli("show bihash"))
759 self.logger.info("Logging testcase specific show commands.")
760 self.show_commands_at_teardown()
761 self.registry.remove_vpp_config(self.logger)
762 # Save/Dump VPP api trace log
763 m = self._testMethodName
764 api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
765 tmp_api_trace = "/tmp/%s" % api_trace
766 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
767 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
768 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
770 os.rename(tmp_api_trace, vpp_api_trace_log)
771 except VppTransportSocketIOError:
772 self.logger.debug("VppTransportSocketIOError: Vpp dead. "
773 "Cannot log show commands.")
776 self.registry.unregister_all(self.logger)
779 """ Clear trace before running each test"""
780 super(VppTestCase, self).setUp()
781 self.reporter.send_keep_alive(self)
783 raise VppDiedError(rv=None, testcase=self.__class__.__name__,
784 method_name=self._testMethodName)
785 self.sleep(.1, "during setUp")
786 self.vpp_stdout_deque.append(
787 "--- test setUp() for %s.%s(%s) starts here ---\n" %
788 (self.__class__.__name__, self._testMethodName,
789 self._testMethodDoc))
790 self.vpp_stderr_deque.append(
791 "--- test setUp() for %s.%s(%s) starts here ---\n" %
792 (self.__class__.__name__, self._testMethodName,
793 self._testMethodDoc))
794 self.vapi.cli("clear trace")
795 # store the test instance inside the test class - so that objects
796 # holding the class can access instance methods (like assertEqual)
797 type(self).test_instance = self
800 def pg_enable_capture(cls, interfaces=None):
802 Enable capture on packet-generator interfaces
804 :param interfaces: iterable interface indexes (if None,
805 use self.pg_interfaces)
808 if interfaces is None:
809 interfaces = cls.pg_interfaces
814 def register_pcap(cls, intf, worker):
815 """ Register a pcap in the testclass """
816 # add to the list of captures with current timestamp
817 cls._pcaps.append((intf, worker))
820 def get_vpp_time(cls):
821 # processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT"
822 # returns float("2.190522")
823 timestr = cls.vapi.cli('show clock')
824 head, sep, tail = timestr.partition(',')
825 head, sep, tail = head.partition('Time now')
829 def sleep_on_vpp_time(cls, sec):
830 """ Sleep according to time in VPP world """
831 # On a busy system with many processes
832 # we might end up with VPP time being slower than real world
833 # So take that into account when waiting for VPP to do something
834 start_time = cls.get_vpp_time()
835 while cls.get_vpp_time() - start_time < sec:
839 def pg_start(cls, trace=True):
840 """ Enable the PG, wait till it is done, then clean up """
841 for (intf, worker) in cls._old_pcaps:
842 intf.handle_old_pcap_file(intf.get_in_path(worker),
843 intf.in_history_counter)
846 cls.vapi.cli("clear trace")
847 cls.vapi.cli("trace add pg-input 1000")
848 cls.vapi.cli('packet-generator enable')
849 # PG, when starts, runs to completion -
850 # so let's avoid a race condition,
851 # and wait a little till it's done.
852 # Then clean it up - and then be gone.
853 deadline = time.time() + 300
854 while cls.vapi.cli('show packet-generator').find("Yes") != -1:
855 cls.sleep(0.01) # yield
856 if time.time() > deadline:
857 cls.logger.error("Timeout waiting for pg to stop")
859 for intf, worker in cls._pcaps:
860 cls.vapi.cli('packet-generator delete %s' %
861 intf.get_cap_name(worker))
862 cls._old_pcaps = cls._pcaps
866 def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0,
869 Create packet-generator interfaces.
871 :param interfaces: iterable indexes of the interfaces.
872 :returns: List of created interfaces.
877 intf = VppPGInterface(cls, i, gso, gso_size, mode)
878 setattr(cls, intf.name, intf)
880 cls.pg_interfaces = result
884 def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
885 pgmode = VppEnum.vl_api_pg_interface_mode_t
886 return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
887 pgmode.PG_API_MODE_IP4)
890 def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
891 pgmode = VppEnum.vl_api_pg_interface_mode_t
892 return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
893 pgmode.PG_API_MODE_IP6)
896 def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
897 pgmode = VppEnum.vl_api_pg_interface_mode_t
898 return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
899 pgmode.PG_API_MODE_ETHERNET)
902 def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
903 pgmode = VppEnum.vl_api_pg_interface_mode_t
904 return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
905 pgmode.PG_API_MODE_ETHERNET)
908 def create_loopback_interfaces(cls, count):
910 Create loopback interfaces.
912 :param count: number of interfaces created.
913 :returns: List of created interfaces.
915 result = [VppLoInterface(cls) for i in range(count)]
917 setattr(cls, intf.name, intf)
918 cls.lo_interfaces = result
922 def create_bvi_interfaces(cls, count):
924 Create BVI interfaces.
926 :param count: number of interfaces created.
927 :returns: List of created interfaces.
929 result = [VppBviInterface(cls) for i in range(count)]
931 setattr(cls, intf.name, intf)
932 cls.bvi_interfaces = result
936 def extend_packet(packet, size, padding=' '):
938 Extend packet to given size by padding with spaces or custom padding
939 NOTE: Currently works only when Raw layer is present.
941 :param packet: packet
942 :param size: target size
943 :param padding: padding used to extend the payload
946 packet_len = len(packet) + 4
947 extend = size - packet_len
949 num = (extend // len(padding)) + 1
950 packet[Raw].load += (padding * num)[:extend].encode("ascii")
953 def reset_packet_infos(cls):
954 """ Reset the list of packet info objects and packet counts to zero """
955 cls._packet_infos = {}
956 cls._packet_count_for_dst_if_idx = {}
959 def create_packet_info(cls, src_if, dst_if):
961 Create packet info object containing the source and destination indexes
962 and add it to the testcase's packet info list
964 :param VppInterface src_if: source interface
965 :param VppInterface dst_if: destination interface
967 :returns: _PacketInfo object
971 info.index = len(cls._packet_infos)
972 info.src = src_if.sw_if_index
973 info.dst = dst_if.sw_if_index
974 if isinstance(dst_if, VppSubInterface):
975 dst_idx = dst_if.parent.sw_if_index
977 dst_idx = dst_if.sw_if_index
978 if dst_idx in cls._packet_count_for_dst_if_idx:
979 cls._packet_count_for_dst_if_idx[dst_idx] += 1
981 cls._packet_count_for_dst_if_idx[dst_idx] = 1
982 cls._packet_infos[info.index] = info
986 def info_to_payload(info):
988 Convert _PacketInfo object to packet payload
990 :param info: _PacketInfo object
992 :returns: string containing serialized data from packet info
995 # retrieve payload, currently 18 bytes (4 x ints + 1 short)
996 return pack('iiiih', info.index, info.src,
997 info.dst, info.ip, info.proto)
1000 def payload_to_info(payload, payload_field='load'):
1002 Convert packet payload to _PacketInfo object
1004 :param payload: packet payload
1005 :type payload: <class 'scapy.packet.Raw'>
1006 :param payload_field: packet fieldname of payload "load" for
1007 <class 'scapy.packet.Raw'>
1008 :type payload_field: str
1009 :returns: _PacketInfo object containing de-serialized data from payload
1013 # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1014 payload_b = getattr(payload, payload_field)[:18]
1016 info = _PacketInfo()
1017 info.index, info.src, info.dst, info.ip, info.proto \
1018 = unpack('iiiih', payload_b)
1020 # some SRv6 TCs depend on get an exception if bad values are detected
1021 if info.index > 0x4000:
1022 raise ValueError('Index value is invalid')
1026 def get_next_packet_info(self, info):
1028 Iterate over the packet info list stored in the testcase
1029 Start iteration with first element if info is None
1030 Continue based on index in info if info is specified
1032 :param info: info or None
1033 :returns: next info in list or None if no more infos
1038 next_index = info.index + 1
1039 if next_index == len(self._packet_infos):
1042 return self._packet_infos[next_index]
1044 def get_next_packet_info_for_interface(self, src_index, info):
1046 Search the packet info list for the next packet info with same source
1049 :param src_index: source interface index to search for
1050 :param info: packet info - where to start the search
1051 :returns: packet info or None
1055 info = self.get_next_packet_info(info)
1058 if info.src == src_index:
1061 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
1063 Search the packet info list for the next packet info with same source
1064 and destination interface indexes
1066 :param src_index: source interface index to search for
1067 :param dst_index: destination interface index to search for
1068 :param info: packet info - where to start the search
1069 :returns: packet info or None
1073 info = self.get_next_packet_info_for_interface(src_index, info)
1076 if info.dst == dst_index:
1079 def assert_equal(self, real_value, expected_value, name_or_class=None):
1080 if name_or_class is None:
1081 self.assertEqual(real_value, expected_value)
1084 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
1085 msg = msg % (getdoc(name_or_class).strip(),
1086 real_value, str(name_or_class(real_value)),
1087 expected_value, str(name_or_class(expected_value)))
1089 msg = "Invalid %s: %s does not match expected value %s" % (
1090 name_or_class, real_value, expected_value)
1092 self.assertEqual(real_value, expected_value, msg)
1094 def assert_in_range(self,
1102 msg = "Invalid %s: %s out of range <%s,%s>" % (
1103 name, real_value, expected_min, expected_max)
1104 self.assertTrue(expected_min <= real_value <= expected_max, msg)
1106 def assert_packet_checksums_valid(self, packet,
1107 ignore_zero_udp_checksums=True):
1108 received = packet.__class__(scapy.compat.raw(packet))
1109 udp_layers = ['UDP', 'UDPerror']
1110 checksum_fields = ['cksum', 'chksum']
1113 temp = received.__class__(scapy.compat.raw(received))
1115 layer = temp.getlayer(counter)
1117 layer = layer.copy()
1118 layer.remove_payload()
1119 for cf in checksum_fields:
1120 if hasattr(layer, cf):
1121 if ignore_zero_udp_checksums and \
1122 0 == getattr(layer, cf) and \
1123 layer.name in udp_layers:
1125 delattr(temp.getlayer(counter), cf)
1126 checksums.append((counter, cf))
1129 counter = counter + 1
1130 if 0 == len(checksums):
1132 temp = temp.__class__(scapy.compat.raw(temp))
1133 for layer, cf in checksums:
1134 calc_sum = getattr(temp[layer], cf)
1136 getattr(received[layer], cf), calc_sum,
1137 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
1139 "Checksum field `%s` on `%s` layer has correct value `%s`" %
1140 (cf, temp[layer].name, calc_sum))
1142 def assert_checksum_valid(self, received_packet, layer,
1143 field_name='chksum',
1144 ignore_zero_checksum=False):
1145 """ Check checksum of received packet on given layer """
1146 received_packet_checksum = getattr(received_packet[layer], field_name)
1147 if ignore_zero_checksum and 0 == received_packet_checksum:
1149 recalculated = received_packet.__class__(
1150 scapy.compat.raw(received_packet))
1151 delattr(recalculated[layer], field_name)
1152 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1153 self.assert_equal(received_packet_checksum,
1154 getattr(recalculated[layer], field_name),
1155 "packet checksum on layer: %s" % layer)
1157 def assert_ip_checksum_valid(self, received_packet,
1158 ignore_zero_checksum=False):
1159 self.assert_checksum_valid(received_packet, 'IP',
1160 ignore_zero_checksum=ignore_zero_checksum)
1162 def assert_tcp_checksum_valid(self, received_packet,
1163 ignore_zero_checksum=False):
1164 self.assert_checksum_valid(received_packet, 'TCP',
1165 ignore_zero_checksum=ignore_zero_checksum)
1167 def assert_udp_checksum_valid(self, received_packet,
1168 ignore_zero_checksum=True):
1169 self.assert_checksum_valid(received_packet, 'UDP',
1170 ignore_zero_checksum=ignore_zero_checksum)
1172 def assert_embedded_icmp_checksum_valid(self, received_packet):
1173 if received_packet.haslayer(IPerror):
1174 self.assert_checksum_valid(received_packet, 'IPerror')
1175 if received_packet.haslayer(TCPerror):
1176 self.assert_checksum_valid(received_packet, 'TCPerror')
1177 if received_packet.haslayer(UDPerror):
1178 self.assert_checksum_valid(received_packet, 'UDPerror',
1179 ignore_zero_checksum=True)
1180 if received_packet.haslayer(ICMPerror):
1181 self.assert_checksum_valid(received_packet, 'ICMPerror')
1183 def assert_icmp_checksum_valid(self, received_packet):
1184 self.assert_checksum_valid(received_packet, 'ICMP')
1185 self.assert_embedded_icmp_checksum_valid(received_packet)
1187 def assert_icmpv6_checksum_valid(self, pkt):
1188 if pkt.haslayer(ICMPv6DestUnreach):
1189 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
1190 self.assert_embedded_icmp_checksum_valid(pkt)
1191 if pkt.haslayer(ICMPv6EchoRequest):
1192 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
1193 if pkt.haslayer(ICMPv6EchoReply):
1194 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
1196 def get_packet_counter(self, counter):
1197 if counter.startswith("/"):
1198 counter_value = self.statistics.get_counter(counter)
1200 counters = self.vapi.cli("sh errors").split('\n')
1202 for i in range(1, len(counters) - 1):
1203 results = counters[i].split()
1204 if results[1] == counter:
1205 counter_value = int(results[0])
1207 return counter_value
1209 def assert_packet_counter_equal(self, counter, expected_value):
1210 counter_value = self.get_packet_counter(counter)
1211 self.assert_equal(counter_value, expected_value,
1212 "packet counter `%s'" % counter)
1214 def assert_error_counter_equal(self, counter, expected_value):
1215 counter_value = self.statistics[counter].sum()
1216 self.assert_equal(counter_value, expected_value,
1217 "error counter `%s'" % counter)
1220 def sleep(cls, timeout, remark=None):
1222 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1223 # * by Guido, only the main thread can be interrupted.
1225 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1228 if hasattr(os, 'sched_yield'):
1234 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1235 before = time.time()
1238 if after - before > 2 * timeout:
1239 cls.logger.error("unexpected self.sleep() result - "
1240 "slept for %es instead of ~%es!",
1241 after - before, timeout)
1244 "Finished sleep (%s) - slept %es (wanted %es)",
1245 remark, after - before, timeout)
1247 def virtual_sleep(self, timeout, remark=None):
1248 self.logger.debug("Moving VPP time by %s (%s)", timeout, remark)
1249 self.vapi.cli("set clock adjust %s" % timeout)
1251 def pg_send(self, intf, pkts, worker=None, trace=True):
1252 intf.add_stream(pkts, worker=worker)
1253 self.pg_enable_capture(self.pg_interfaces)
1254 self.pg_start(trace=trace)
1256 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None,
1258 self.pg_send(intf, pkts)
1261 for i in self.pg_interfaces:
1262 i.get_capture(0, timeout=timeout)
1263 i.assert_nothing_captured(remark=remark)
1266 self.logger.debug(self.vapi.cli("show trace"))
1268 def send_and_expect(self, intf, pkts, output, n_rx=None, worker=None,
1271 n_rx = 1 if isinstance(pkts, Packet) else len(pkts)
1272 self.pg_send(intf, pkts, worker=worker, trace=trace)
1273 rx = output.get_capture(n_rx)
1275 self.logger.debug(self.vapi.cli("show trace"))
1278 def send_and_expect_load_balancing(self, input, pkts, outputs,
1279 worker=None, trace=True):
1280 self.pg_send(input, pkts, worker=worker, trace=trace)
1283 rx = oo._get_capture(1)
1284 self.assertNotEqual(0, len(rx))
1287 self.logger.debug(self.vapi.cli("show trace"))
1290 def send_and_expect_only(self, intf, pkts, output, timeout=None):
1291 self.pg_send(intf, pkts)
1292 rx = output.get_capture(len(pkts))
1296 for i in self.pg_interfaces:
1297 if i not in outputs:
1298 i.get_capture(0, timeout=timeout)
1299 i.assert_nothing_captured()
1305 def get_testcase_doc_name(test):
1306 return getdoc(test.__class__).splitlines()[0]
1309 def get_test_description(descriptions, test):
1310 short_description = test.shortDescription()
1311 if descriptions and short_description:
1312 return short_description
1317 class TestCaseInfo(object):
1318 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1319 self.logger = logger
1320 self.tempdir = tempdir
1321 self.vpp_pid = vpp_pid
1322 self.vpp_bin_path = vpp_bin_path
1323 self.core_crash_test = None
1326 class VppTestResult(unittest.TestResult):
1328 @property result_string
1329 String variable to store the test case result string.
1331 List variable containing 2-tuples of TestCase instances and strings
1332 holding formatted tracebacks. Each tuple represents a test which
1333 raised an unexpected exception.
1335 List variable containing 2-tuples of TestCase instances and strings
1336 holding formatted tracebacks. Each tuple represents a test where
1337 a failure was explicitly signalled using the TestCase.assert*()
1341 failed_test_cases_info = set()
1342 core_crash_test_cases_info = set()
1343 current_test_case_info = None
1345 def __init__(self, stream=None, descriptions=None, verbosity=None,
1348 :param stream File descriptor to store where to report test results.
1349 Set to the standard error stream by default.
1350 :param descriptions Boolean variable to store information if to use
1351 test case descriptions.
1352 :param verbosity Integer variable to store required verbosity level.
1354 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1355 self.stream = stream
1356 self.descriptions = descriptions
1357 self.verbosity = verbosity
1358 self.result_string = None
1359 self.runner = runner
1362 def addSuccess(self, test):
1364 Record a test succeeded result
1369 if self.current_test_case_info:
1370 self.current_test_case_info.logger.debug(
1371 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1372 test._testMethodName,
1373 test._testMethodDoc))
1374 unittest.TestResult.addSuccess(self, test)
1375 self.result_string = colorize("OK", GREEN)
1377 self.send_result_through_pipe(test, PASS)
1379 def addSkip(self, test, reason):
1381 Record a test skipped.
1387 if self.current_test_case_info:
1388 self.current_test_case_info.logger.debug(
1389 "--- addSkip() %s.%s(%s) called, reason is %s" %
1390 (test.__class__.__name__, test._testMethodName,
1391 test._testMethodDoc, reason))
1392 unittest.TestResult.addSkip(self, test, reason)
1393 self.result_string = colorize("SKIP", YELLOW)
1395 if reason == "not enough cpus":
1396 self.send_result_through_pipe(test, SKIP_CPU_SHORTAGE)
1398 self.send_result_through_pipe(test, SKIP)
1400 def symlink_failed(self):
1401 if self.current_test_case_info:
1403 failed_dir = config.failed_dir
1404 link_path = os.path.join(
1407 os.path.basename(self.current_test_case_info.tempdir))
1409 self.current_test_case_info.logger.debug(
1410 "creating a link to the failed test")
1411 self.current_test_case_info.logger.debug(
1412 "os.symlink(%s, %s)" %
1413 (self.current_test_case_info.tempdir, link_path))
1414 if os.path.exists(link_path):
1415 self.current_test_case_info.logger.debug(
1416 'symlink already exists')
1418 os.symlink(self.current_test_case_info.tempdir, link_path)
1420 except Exception as e:
1421 self.current_test_case_info.logger.error(e)
1423 def send_result_through_pipe(self, test, result):
1424 if hasattr(self, 'test_framework_result_pipe'):
1425 pipe = self.test_framework_result_pipe
1427 pipe.send((test.id(), result))
1429 def log_error(self, test, err, fn_name):
1430 if self.current_test_case_info:
1431 if isinstance(test, unittest.suite._ErrorHolder):
1432 test_name = test.description
1434 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1435 test._testMethodName,
1436 test._testMethodDoc)
1437 self.current_test_case_info.logger.debug(
1438 "--- %s() %s called, err is %s" %
1439 (fn_name, test_name, err))
1440 self.current_test_case_info.logger.debug(
1441 "formatted exception is:\n%s" %
1442 "".join(format_exception(*err)))
1444 def add_error(self, test, err, unittest_fn, error_type):
1445 if error_type == FAIL:
1446 self.log_error(test, err, 'addFailure')
1447 error_type_str = colorize("FAIL", RED)
1448 elif error_type == ERROR:
1449 self.log_error(test, err, 'addError')
1450 error_type_str = colorize("ERROR", RED)
1452 raise Exception('Error type %s cannot be used to record an '
1453 'error or a failure' % error_type)
1455 unittest_fn(self, test, err)
1456 if self.current_test_case_info:
1457 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1459 self.current_test_case_info.tempdir)
1460 self.symlink_failed()
1461 self.failed_test_cases_info.add(self.current_test_case_info)
1462 if is_core_present(self.current_test_case_info.tempdir):
1463 if not self.current_test_case_info.core_crash_test:
1464 if isinstance(test, unittest.suite._ErrorHolder):
1465 test_name = str(test)
1467 test_name = "'{!s}' ({!s})".format(
1468 get_testcase_doc_name(test), test.id())
1469 self.current_test_case_info.core_crash_test = test_name
1470 self.core_crash_test_cases_info.add(
1471 self.current_test_case_info)
1473 self.result_string = '%s [no temp dir]' % error_type_str
1475 self.send_result_through_pipe(test, error_type)
1477 def addFailure(self, test, err):
1479 Record a test failed result
1482 :param err: error message
1485 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1487 def addError(self, test, err):
1489 Record a test error result
1492 :param err: error message
1495 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1497 def getDescription(self, test):
1499 Get test description
1502 :returns: test description
1505 return get_test_description(self.descriptions, test)
1507 def startTest(self, test):
1515 def print_header(test):
1516 if test.__class__ in self.printed:
1519 test_doc = getdoc(test)
1521 raise Exception("No doc string for test '%s'" % test.id())
1523 test_title = test_doc.splitlines()[0].rstrip()
1524 test_title = colorize(test_title, GREEN)
1525 if test.is_tagged_run_solo():
1526 test_title = colorize(f"SOLO RUN: {test_title}", YELLOW)
1528 # This block may overwrite the colorized title above,
1529 # but we want this to stand out and be fixed
1530 if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
1531 test_title = colorize(
1532 f"FIXME with VPP workers: {test_title}", RED)
1534 if test.has_tag(TestCaseTag.FIXME_ASAN):
1535 test_title = colorize(
1536 f"FIXME with ASAN: {test_title}", RED)
1537 test.skip_fixme_asan()
1539 if hasattr(test, 'vpp_worker_count'):
1540 if test.vpp_worker_count == 0:
1541 test_title += " [main thread only]"
1542 elif test.vpp_worker_count == 1:
1543 test_title += " [1 worker thread]"
1545 test_title += f" [{test.vpp_worker_count} worker threads]"
1547 if test.__class__.skipped_due_to_cpu_lack:
1548 test_title = colorize(
1549 f"{test_title} [skipped - not enough cpus, "
1550 f"required={test.__class__.get_cpus_required()}, "
1551 f"available={max_vpp_cpus}]", YELLOW)
1553 print(double_line_delim)
1555 print(double_line_delim)
1556 self.printed.append(test.__class__)
1559 self.start_test = time.time()
1560 unittest.TestResult.startTest(self, test)
1561 if self.verbosity > 0:
1562 self.stream.writeln(
1563 "Starting " + self.getDescription(test) + " ...")
1564 self.stream.writeln(single_line_delim)
1566 def stopTest(self, test):
1568 Called when the given test has been run
1573 unittest.TestResult.stopTest(self, test)
1575 if self.verbosity > 0:
1576 self.stream.writeln(single_line_delim)
1577 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1578 self.result_string))
1579 self.stream.writeln(single_line_delim)
1581 self.stream.writeln("%-68s %4.2f %s" %
1582 (self.getDescription(test),
1583 time.time() - self.start_test,
1584 self.result_string))
1586 self.send_result_through_pipe(test, TEST_RUN)
1588 def printErrors(self):
1590 Print errors from running the test case
1592 if len(self.errors) > 0 or len(self.failures) > 0:
1593 self.stream.writeln()
1594 self.printErrorList('ERROR', self.errors)
1595 self.printErrorList('FAIL', self.failures)
1597 # ^^ that is the last output from unittest before summary
1598 if not self.runner.print_summary:
1599 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1600 self.stream = devnull
1601 self.runner.stream = devnull
1603 def printErrorList(self, flavour, errors):
1605 Print error list to the output stream together with error type
1606 and test case description.
1608 :param flavour: error type
1609 :param errors: iterable errors
1612 for test, err in errors:
1613 self.stream.writeln(double_line_delim)
1614 self.stream.writeln("%s: %s" %
1615 (flavour, self.getDescription(test)))
1616 self.stream.writeln(single_line_delim)
1617 self.stream.writeln("%s" % err)
1620 class VppTestRunner(unittest.TextTestRunner):
1622 A basic test runner implementation which prints results to standard error.
1626 def resultclass(self):
1627 """Class maintaining the results of the tests"""
1628 return VppTestResult
1630 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1631 result_pipe=None, failfast=False, buffer=False,
1632 resultclass=None, print_summary=True, **kwargs):
1633 # ignore stream setting here, use hard-coded stdout to be in sync
1634 # with prints from VppTestCase methods ...
1635 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1636 verbosity, failfast, buffer,
1637 resultclass, **kwargs)
1638 KeepAliveReporter.pipe = keep_alive_pipe
1640 self.orig_stream = self.stream
1641 self.resultclass.test_framework_result_pipe = result_pipe
1643 self.print_summary = print_summary
1645 def _makeResult(self):
1646 return self.resultclass(self.stream,
1651 def run(self, test):
1658 faulthandler.enable() # emit stack trace to stderr if killed by signal
1660 result = super(VppTestRunner, self).run(test)
1661 if not self.print_summary:
1662 self.stream = self.orig_stream
1663 result.stream = self.orig_stream
1667 class Worker(Thread):
1668 def __init__(self, executable_args, logger, env=None, *args, **kwargs):
1669 super(Worker, self).__init__(*args, **kwargs)
1670 self.logger = logger
1671 self.args = executable_args
1672 if hasattr(self, 'testcase') and self.testcase.debug_all:
1673 if self.testcase.debug_gdbserver:
1674 self.args = ['/usr/bin/gdbserver', 'localhost:{port}'
1675 .format(port=self.testcase.gdbserver_port)] + args
1676 elif self.testcase.debug_gdb and hasattr(self, 'wait_for_gdb'):
1677 self.args.append(self.wait_for_gdb)
1678 self.app_bin = executable_args[0]
1679 self.app_name = os.path.basename(self.app_bin)
1680 if hasattr(self, 'role'):
1681 self.app_name += ' {role}'.format(role=self.role)
1684 env = {} if env is None else env
1685 self.env = copy.deepcopy(env)
1687 def wait_for_enter(self):
1688 if not hasattr(self, 'testcase'):
1690 if self.testcase.debug_all and self.testcase.debug_gdbserver:
1692 print(double_line_delim)
1693 print("Spawned GDB Server for '{app}' with PID: {pid}"
1694 .format(app=self.app_name, pid=self.process.pid))
1695 elif self.testcase.debug_all and self.testcase.debug_gdb:
1697 print(double_line_delim)
1698 print("Spawned '{app}' with PID: {pid}"
1699 .format(app=self.app_name, pid=self.process.pid))
1702 print(single_line_delim)
1703 print("You can debug '{app}' using:".format(app=self.app_name))
1704 if self.testcase.debug_gdbserver:
1705 print("sudo gdb " + self.app_bin +
1706 " -ex 'target remote localhost:{port}'"
1707 .format(port=self.testcase.gdbserver_port))
1708 print("Now is the time to attach gdb by running the above "
1709 "command, set up breakpoints etc., then resume from "
1710 "within gdb by issuing the 'continue' command")
1711 self.testcase.gdbserver_port += 1
1712 elif self.testcase.debug_gdb:
1713 print("sudo gdb " + self.app_bin +
1714 " -ex 'attach {pid}'".format(pid=self.process.pid))
1715 print("Now is the time to attach gdb by running the above "
1716 "command and set up breakpoints etc., then resume from"
1717 " within gdb by issuing the 'continue' command")
1718 print(single_line_delim)
1719 input("Press ENTER to continue running the testcase...")
1722 executable = self.args[0]
1723 if not os.path.exists(executable) or not os.access(
1724 executable, os.F_OK | os.X_OK):
1725 # Exit code that means some system file did not exist,
1726 # could not be opened, or had some other kind of error.
1727 self.result = os.EX_OSFILE
1728 raise EnvironmentError(
1729 "executable '%s' is not found or executable." % executable)
1730 self.logger.debug("Running executable '{app}': '{cmd}'"
1731 .format(app=self.app_name,
1732 cmd=' '.join(self.args)))
1733 env = os.environ.copy()
1734 env.update(self.env)
1735 env["CK_LOG_FILE_NAME"] = "-"
1736 self.process = subprocess.Popen(
1737 ['stdbuf', '-o0', '-e0'] + self.args, shell=False, env=env,
1738 preexec_fn=os.setpgrp, stdout=subprocess.PIPE,
1739 stderr=subprocess.PIPE)
1740 self.wait_for_enter()
1741 out, err = self.process.communicate()
1742 self.logger.debug("Finished running `{app}'".format(app=self.app_name))
1743 self.logger.info("Return code is `%s'" % self.process.returncode)
1744 self.logger.info(single_line_delim)
1745 self.logger.info("Executable `{app}' wrote to stdout:"
1746 .format(app=self.app_name))
1747 self.logger.info(single_line_delim)
1748 self.logger.info(out.decode('utf-8'))
1749 self.logger.info(single_line_delim)
1750 self.logger.info("Executable `{app}' wrote to stderr:"
1751 .format(app=self.app_name))
1752 self.logger.info(single_line_delim)
1753 self.logger.info(err.decode('utf-8'))
1754 self.logger.info(single_line_delim)
1755 self.result = self.process.returncode
1758 if __name__ == '__main__':