3 from __future__ import print_function
19 from collections import deque
20 from threading import Thread, Event
21 from inspect import getdoc, isclass
22 from traceback import format_exception
23 from logging import FileHandler, DEBUG, Formatter
25 from abc import ABC, abstractmethod
26 from struct import pack, unpack
29 from scapy.packet import Raw, Packet
30 from config import config, available_cpus, num_cpus, max_vpp_cpus
31 import hook as hookmodule
32 from vpp_pg_interface import VppPGInterface
33 from vpp_sub_interface import VppSubInterface
34 from vpp_lo_interface import VppLoInterface
35 from vpp_bvi_interface import VppBviInterface
36 from vpp_papi_provider import VppPapiProvider
37 from vpp_papi import VppEnum
39 from vpp_papi.vpp_stats import VPPStats
40 from vpp_papi.vpp_transport_socket import VppTransportSocketIOError
41 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
43 from vpp_object import VppObjectRegistry
44 from util import ppp, is_core_present
45 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
46 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
47 from scapy.layers.inet6 import ICMPv6EchoReply
50 logger = logging.getLogger(__name__)
52 # Set up an empty logger for the testcase that can be overridden as necessary
53 null_logger = logging.getLogger('VppTestCase')
54 null_logger.addHandler(logging.NullHandler())
64 if config.debug_framework:
68 Test framework module.
70 The module provides a set of tools for constructing and running tests and
71 representing the results.
75 class VppDiedError(Exception):
76 """ exception for reporting that the subprocess has died."""
78 signals_by_value = {v: k for k, v in signal.__dict__.items() if
79 k.startswith('SIG') and not k.startswith('SIG_')}
81 def __init__(self, rv=None, testcase=None, method_name=None):
83 self.signal_name = None
84 self.testcase = testcase
85 self.method_name = method_name
88 self.signal_name = VppDiedError.signals_by_value[-rv]
89 except (KeyError, TypeError):
92 if testcase is None and method_name is None:
95 in_msg = ' while running %s.%s' % (testcase, method_name)
98 msg = "VPP subprocess died unexpectedly%s with return code: %d%s."\
99 % (in_msg, self.rv, ' [%s]' %
101 self.signal_name is not None else ''))
103 msg = "VPP subprocess died unexpectedly%s." % in_msg
105 super(VppDiedError, self).__init__(msg)
108 class _PacketInfo(object):
109 """Private class to create packet info object.
111 Help process information about the next packet.
112 Set variables to default values.
114 #: Store the index of the packet.
116 #: Store the index of the source packet generator interface of the packet.
118 #: Store the index of the destination packet generator interface
121 #: Store expected ip version
123 #: Store expected upper protocol
125 #: Store the copy of the former packet.
128 def __eq__(self, other):
129 index = self.index == other.index
130 src = self.src == other.src
131 dst = self.dst == other.dst
132 data = self.data == other.data
133 return index and src and dst and data
136 def pump_output(testclass):
137 """ pump output from vpp stdout/stderr to proper queues """
140 while not testclass.pump_thread_stop_flag.is_set():
141 readable = select.select([testclass.vpp.stdout.fileno(),
142 testclass.vpp.stderr.fileno(),
143 testclass.pump_thread_wakeup_pipe[0]],
145 if testclass.vpp.stdout.fileno() in readable:
146 read = os.read(testclass.vpp.stdout.fileno(), 102400)
148 split = read.decode('ascii',
149 errors='backslashreplace').splitlines(True)
150 if len(stdout_fragment) > 0:
151 split[0] = "%s%s" % (stdout_fragment, split[0])
152 if len(split) > 0 and split[-1].endswith("\n"):
156 stdout_fragment = split[-1]
157 testclass.vpp_stdout_deque.extend(split[:limit])
158 if not config.cache_vpp_output:
159 for line in split[:limit]:
160 testclass.logger.info(
161 "VPP STDOUT: %s" % line.rstrip("\n"))
162 if testclass.vpp.stderr.fileno() in readable:
163 read = os.read(testclass.vpp.stderr.fileno(), 102400)
165 split = read.decode('ascii',
166 errors='backslashreplace').splitlines(True)
167 if len(stderr_fragment) > 0:
168 split[0] = "%s%s" % (stderr_fragment, split[0])
169 if len(split) > 0 and split[-1].endswith("\n"):
173 stderr_fragment = split[-1]
175 testclass.vpp_stderr_deque.extend(split[:limit])
176 if not config.cache_vpp_output:
177 for line in split[:limit]:
178 testclass.logger.error(
179 "VPP STDERR: %s" % line.rstrip("\n"))
180 # ignoring the dummy pipe here intentionally - the
181 # flag will take care of properly terminating the loop
184 def _is_platform_aarch64():
185 return platform.machine() == 'aarch64'
188 is_platform_aarch64 = _is_platform_aarch64()
191 class KeepAliveReporter(object):
193 Singleton object which reports test start to parent process
198 self.__dict__ = self._shared_state
206 def pipe(self, pipe):
207 if self._pipe is not None:
208 raise Exception("Internal error - pipe should only be set once.")
211 def send_keep_alive(self, test, desc=None):
213 Write current test tmpdir & desc to keep-alive pipe to signal liveness
215 if self.pipe is None:
216 # if not running forked..
220 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
224 self.pipe.send((desc, config.vpp, test.tempdir, test.vpp.pid))
227 class TestCaseTag(Enum):
228 # marks the suites that must run at the end
229 # using only a single test runner
231 # marks the suites broken on VPP multi-worker
232 FIXME_VPP_WORKERS = 2
233 # marks the suites broken when ASan is enabled
237 def create_tag_decorator(e):
240 cls.test_tags.append(e)
241 except AttributeError:
247 tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
248 tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
249 tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN)
263 class CPUInterface(ABC):
265 skipped_due_to_cpu_lack = False
269 def get_cpus_required(cls):
273 def assign_cpus(cls, cpus):
277 class VppTestCase(CPUInterface, unittest.TestCase):
278 """This subclass is a base class for VPP test cases that are implemented as
279 classes. It provides methods to create and run test case.
282 extra_vpp_statseg_config = ""
283 extra_vpp_punt_config = []
284 extra_vpp_plugin_config = []
286 vapi_response_timeout = 5
289 def packet_infos(self):
290 """List of packet infos"""
291 return self._packet_infos
294 def get_packet_count_for_if_idx(cls, dst_if_index):
295 """Get the number of packet info for specified destination if index"""
296 if dst_if_index in cls._packet_count_for_dst_if_idx:
297 return cls._packet_count_for_dst_if_idx[dst_if_index]
302 def has_tag(cls, tag):
303 """ if the test case has a given tag - return true """
305 return tag in cls.test_tags
306 except AttributeError:
311 def is_tagged_run_solo(cls):
312 """ if the test case class is timing-sensitive - return true """
313 return cls.has_tag(TestCaseTag.RUN_SOLO)
316 def skip_fixme_asan(cls):
317 """ if @tag_fixme_asan & ASan is enabled - mark for skip """
318 if cls.has_tag(TestCaseTag.FIXME_ASAN):
319 vpp_extra_cmake_args = os.environ.get('VPP_EXTRA_CMAKE_ARGS', '')
320 if 'DVPP_ENABLE_SANITIZE_ADDR=ON' in vpp_extra_cmake_args:
321 cls = unittest.skip("Skipping @tag_fixme_asan tests")(cls)
325 """Return the instance of this testcase"""
326 return cls.test_instance
329 def set_debug_flags(cls, d):
330 cls.gdbserver_port = 7777
331 cls.debug_core = False
332 cls.debug_gdb = False
333 cls.debug_gdbserver = False
334 cls.debug_all = False
335 cls.debug_attach = False
340 cls.debug_core = True
341 elif dl == "gdb" or dl == "gdb-all":
343 elif dl == "gdbserver" or dl == "gdbserver-all":
344 cls.debug_gdbserver = True
346 cls.debug_attach = True
348 raise Exception("Unrecognized DEBUG option: '%s'" % d)
349 if dl == "gdb-all" or dl == "gdbserver-all":
353 def get_vpp_worker_count(cls):
354 if not hasattr(cls, "vpp_worker_count"):
355 if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
356 cls.vpp_worker_count = 0
358 cls.vpp_worker_count = config.vpp_worker_count
359 return cls.vpp_worker_count
362 def get_cpus_required(cls):
363 return 1 + cls.get_vpp_worker_count()
366 def setUpConstants(cls):
367 """ Set-up the test case class based on environment variables """
368 cls.step = config.step
369 cls.plugin_path = ":".join(config.vpp_plugin_dir)
370 cls.test_plugin_path = ":".join(config.vpp_test_plugin_dir)
371 cls.extern_plugin_path = ":".join(config.extern_plugin_dir)
373 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
374 debug_cli = "cli-listen localhost:5002"
375 size = re.search(r"\d+[gG]", config.coredump_size)
377 coredump_size = f"coredump-size {config.coredump_size}".lower()
379 coredump_size = "coredump-size unlimited"
380 default_variant = config.variant
381 if default_variant is not None:
382 default_variant = "defaults { %s 100 }" % default_variant
386 api_fuzzing = config.api_fuzz
387 if api_fuzzing is None:
392 "unix", "{", "nodaemon", debug_cli, "full-coredump",
393 coredump_size, "runtime-dir", cls.tempdir, "}",
394 "api-trace", "{", "on", "}",
395 "api-segment", "{", "prefix", cls.get_api_segment_prefix(), "}",
396 "cpu", "{", "main-core", str(cls.cpus[0]), ]
397 if cls.extern_plugin_path not in (None, ""):
398 cls.extra_vpp_plugin_config.append(
399 "add-path %s" % cls.extern_plugin_path)
400 if cls.get_vpp_worker_count():
401 cls.vpp_cmdline.extend([
402 "corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])])
403 cls.vpp_cmdline.extend([
405 "physmem", "{", "max-size", "32m", "}",
406 "statseg", "{", "socket-name", cls.get_stats_sock_path(),
407 cls.extra_vpp_statseg_config, "}",
408 "socksvr", "{", "socket-name", cls.get_api_sock_path(), "}",
409 "node { ", default_variant, "}",
410 "api-fuzz {", api_fuzzing, "}",
411 "plugins", "{", "plugin", "dpdk_plugin.so", "{", "disable", "}",
412 "plugin", "rdma_plugin.so", "{", "disable", "}",
413 "plugin", "lisp_unittest_plugin.so", "{", "enable", "}",
414 "plugin", "unittest_plugin.so", "{", "enable", "}"
415 ] + cls.extra_vpp_plugin_config + ["}", ])
417 if cls.extra_vpp_punt_config is not None:
418 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
420 if not cls.debug_attach:
421 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
422 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
425 def wait_for_enter(cls):
426 if cls.debug_gdbserver:
427 print(double_line_delim)
428 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
430 print(double_line_delim)
431 print("Spawned VPP with PID: %d" % cls.vpp.pid)
433 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
435 print(single_line_delim)
436 print("You can debug VPP using:")
437 if cls.debug_gdbserver:
438 print(f"sudo gdb {config.vpp} "
439 f"-ex 'target remote localhost:{cls.gdbserver_port}'")
440 print("Now is the time to attach gdb by running the above "
441 "command, set up breakpoints etc., then resume VPP from "
442 "within gdb by issuing the 'continue' command")
443 cls.gdbserver_port += 1
445 print(f"sudo gdb {config.vpp} -ex 'attach {cls.vpp.pid}'")
446 print("Now is the time to attach gdb by running the above "
447 "command and set up breakpoints etc., then resume VPP from"
448 " within gdb by issuing the 'continue' command")
449 print(single_line_delim)
450 input("Press ENTER to continue running the testcase...")
458 cls.logger.debug(f"Assigned cpus: {cls.cpus}")
459 cmdline = cls.vpp_cmdline
461 if cls.debug_gdbserver:
462 gdbserver = '/usr/bin/gdbserver'
463 if not os.path.isfile(gdbserver) or\
464 not os.access(gdbserver, os.X_OK):
465 raise Exception("gdbserver binary '%s' does not exist or is "
466 "not executable" % gdbserver)
468 cmdline = [gdbserver, 'localhost:{port}'
469 .format(port=cls.gdbserver_port)] + cls.vpp_cmdline
470 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
473 cls.vpp = subprocess.Popen(cmdline,
474 stdout=subprocess.PIPE,
475 stderr=subprocess.PIPE)
476 except subprocess.CalledProcessError as e:
477 cls.logger.critical("Subprocess returned with non-0 return code: ("
481 cls.logger.critical("Subprocess returned with OS error: "
482 "(%s) %s", e.errno, e.strerror)
484 except Exception as e:
485 cls.logger.exception("Subprocess returned unexpected from "
492 def wait_for_coredump(cls):
493 corefile = cls.tempdir + "/core"
494 if os.path.isfile(corefile):
495 cls.logger.error("Waiting for coredump to complete: %s", corefile)
496 curr_size = os.path.getsize(corefile)
497 deadline = time.time() + 60
499 while time.time() < deadline:
502 curr_size = os.path.getsize(corefile)
503 if size == curr_size:
507 cls.logger.error("Timed out waiting for coredump to complete:"
510 cls.logger.error("Coredump complete: %s, size %d",
514 def get_stats_sock_path(cls):
515 return "%s/stats.sock" % cls.tempdir
518 def get_api_sock_path(cls):
519 return "%s/api.sock" % cls.tempdir
522 def get_api_segment_prefix(cls):
523 return os.path.basename(cls.tempdir) # Only used for VAPI
526 def get_tempdir(cls):
527 tmpdir = f"{config.tmp_dir}/vpp-unittest-{cls.__name__}"
528 if config.wipe_tmp_dir:
529 shutil.rmtree(tmpdir, ignore_errors=True)
534 def create_file_handler(cls):
535 if config.log_dir is None:
536 cls.file_handler = FileHandler(f"{cls.tempdir}/log.txt")
539 logdir = f"{config.log_dir}/vpp-unittest-{cls.__name__}"
540 if config.wipe_tmp_dir:
541 shutil.rmtree(logdir, ignore_errors=True)
543 cls.file_handler = FileHandler(f"{logdir}/log.txt")
548 Perform class setup before running the testcase
549 Remove shared memory files, start vpp and connect the vpp-api
551 super(VppTestCase, cls).setUpClass()
552 cls.logger = get_logger(cls.__name__)
553 random.seed(config.rnd_seed)
554 if hasattr(cls, 'parallel_handler'):
555 cls.logger.addHandler(cls.parallel_handler)
556 cls.logger.propagate = False
557 cls.set_debug_flags(config.debug)
558 cls.tempdir = cls.get_tempdir()
559 cls.create_file_handler()
560 cls.file_handler.setFormatter(
561 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
563 cls.file_handler.setLevel(DEBUG)
564 cls.logger.addHandler(cls.file_handler)
565 cls.logger.debug("--- setUpClass() for %s called ---" % cls.__name__)
566 os.chdir(cls.tempdir)
567 cls.logger.info("Temporary dir is %s, api socket is %s",
568 cls.tempdir, cls.get_api_sock_path())
569 cls.logger.debug("Random seed is %s", config.rnd_seed)
571 cls.reset_packet_infos()
576 cls.registry = VppObjectRegistry()
577 cls.vpp_startup_failed = False
578 cls.reporter = KeepAliveReporter()
579 # need to catch exceptions here because if we raise, then the cleanup
580 # doesn't get called and we might end with a zombie vpp
586 cls.reporter.send_keep_alive(cls, 'setUpClass')
587 VppTestResult.current_test_case_info = TestCaseInfo(
588 cls.logger, cls.tempdir, cls.vpp.pid, config.vpp)
589 cls.vpp_stdout_deque = deque()
590 cls.vpp_stderr_deque = deque()
591 if not cls.debug_attach:
592 cls.pump_thread_stop_flag = Event()
593 cls.pump_thread_wakeup_pipe = os.pipe()
594 cls.pump_thread = Thread(target=pump_output, args=(cls,))
595 cls.pump_thread.daemon = True
596 cls.pump_thread.start()
597 if cls.debug_gdb or cls.debug_gdbserver or cls.debug_attach:
598 cls.vapi_response_timeout = 0
599 cls.vapi = VppPapiProvider(cls.__name__, cls,
600 cls.vapi_response_timeout)
602 hook = hookmodule.StepHook(cls)
604 hook = hookmodule.PollHook(cls)
605 cls.vapi.register_hook(hook)
606 cls.statistics = VPPStats(socketname=cls.get_stats_sock_path())
610 cls.vpp_startup_failed = True
612 "VPP died shortly after startup, check the"
613 " output to standard error for possible cause")
617 except (vpp_papi.VPPIOError, Exception) as e:
618 cls.logger.debug("Exception connecting to vapi: %s" % e)
619 cls.vapi.disconnect()
621 if cls.debug_gdbserver:
622 print(colorize("You're running VPP inside gdbserver but "
623 "VPP-API connection failed, did you forget "
624 "to 'continue' VPP from within gdb?", RED))
627 last_line = cls.vapi.cli("show thread").split("\n")[-2]
628 cls.vpp_worker_count = int(last_line.split(" ")[0])
629 print("Detected VPP with %s workers." % cls.vpp_worker_count)
630 except vpp_papi.VPPRuntimeError as e:
631 cls.logger.debug("%s" % e)
634 except Exception as e:
635 cls.logger.debug("Exception connecting to VPP: %s" % e)
640 def _debug_quit(cls):
641 if (cls.debug_gdbserver or cls.debug_gdb):
645 if cls.vpp.returncode is None:
647 print(double_line_delim)
648 print("VPP or GDB server is still running")
649 print(single_line_delim)
650 input("When done debugging, press ENTER to kill the "
651 "process and finish running the testcase...")
652 except AttributeError:
658 Disconnect vpp-api, kill vpp and cleanup shared memory files
662 # first signal that we want to stop the pump thread, then wake it up
663 if hasattr(cls, 'pump_thread_stop_flag'):
664 cls.pump_thread_stop_flag.set()
665 if hasattr(cls, 'pump_thread_wakeup_pipe'):
666 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
667 if hasattr(cls, 'pump_thread'):
668 cls.logger.debug("Waiting for pump thread to stop")
669 cls.pump_thread.join()
670 if hasattr(cls, 'vpp_stderr_reader_thread'):
671 cls.logger.debug("Waiting for stderr pump to stop")
672 cls.vpp_stderr_reader_thread.join()
674 if hasattr(cls, 'vpp'):
675 if hasattr(cls, 'vapi'):
676 cls.logger.debug(cls.vapi.vpp.get_stats())
677 cls.logger.debug("Disconnecting class vapi client on %s",
679 cls.vapi.disconnect()
680 cls.logger.debug("Deleting class vapi attribute on %s",
684 if not cls.debug_attach and cls.vpp.returncode is None:
685 cls.wait_for_coredump()
686 cls.logger.debug("Sending TERM to vpp")
688 cls.logger.debug("Waiting for vpp to die")
690 outs, errs = cls.vpp.communicate(timeout=5)
691 except subprocess.TimeoutExpired:
693 outs, errs = cls.vpp.communicate()
694 cls.logger.debug("Deleting class vpp attribute on %s",
696 if not cls.debug_attach:
697 cls.vpp.stdout.close()
698 cls.vpp.stderr.close()
701 if cls.vpp_startup_failed:
702 stdout_log = cls.logger.info
703 stderr_log = cls.logger.critical
705 stdout_log = cls.logger.info
706 stderr_log = cls.logger.info
708 if hasattr(cls, 'vpp_stdout_deque'):
709 stdout_log(single_line_delim)
710 stdout_log('VPP output to stdout while running %s:', cls.__name__)
711 stdout_log(single_line_delim)
712 vpp_output = "".join(cls.vpp_stdout_deque)
713 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
715 stdout_log('\n%s', vpp_output)
716 stdout_log(single_line_delim)
718 if hasattr(cls, 'vpp_stderr_deque'):
719 stderr_log(single_line_delim)
720 stderr_log('VPP output to stderr while running %s:', cls.__name__)
721 stderr_log(single_line_delim)
722 vpp_output = "".join(cls.vpp_stderr_deque)
723 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
725 stderr_log('\n%s', vpp_output)
726 stderr_log(single_line_delim)
729 def tearDownClass(cls):
730 """ Perform final cleanup after running all tests in this test-case """
731 cls.logger.debug("--- tearDownClass() for %s called ---" %
733 cls.reporter.send_keep_alive(cls, 'tearDownClass')
735 cls.file_handler.close()
736 cls.reset_packet_infos()
737 if config.debug_framework:
738 debug_internal.on_tear_down_class(cls)
740 def show_commands_at_teardown(self):
741 """ Allow subclass specific teardown logging additions."""
742 self.logger.info("--- No test specific show commands provided. ---")
745 """ Show various debug prints after each test """
746 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
747 (self.__class__.__name__, self._testMethodName,
748 self._testMethodDoc))
751 if not self.vpp_dead:
752 self.logger.debug(self.vapi.cli("show trace max 1000"))
753 self.logger.info(self.vapi.ppcli("show interface"))
754 self.logger.info(self.vapi.ppcli("show hardware"))
755 self.logger.info(self.statistics.set_errors_str())
756 self.logger.info(self.vapi.ppcli("show run"))
757 self.logger.info(self.vapi.ppcli("show log"))
758 self.logger.info(self.vapi.ppcli("show bihash"))
759 self.logger.info("Logging testcase specific show commands.")
760 self.show_commands_at_teardown()
761 self.registry.remove_vpp_config(self.logger)
762 # Save/Dump VPP api trace log
763 m = self._testMethodName
764 api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
765 tmp_api_trace = "/tmp/%s" % api_trace
766 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
767 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
768 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
770 os.rename(tmp_api_trace, vpp_api_trace_log)
771 except VppTransportSocketIOError:
772 self.logger.debug("VppTransportSocketIOError: Vpp dead. "
773 "Cannot log show commands.")
776 self.registry.unregister_all(self.logger)
779 """ Clear trace before running each test"""
780 super(VppTestCase, self).setUp()
781 self.reporter.send_keep_alive(self)
783 raise VppDiedError(rv=None, testcase=self.__class__.__name__,
784 method_name=self._testMethodName)
785 self.sleep(.1, "during setUp")
786 self.vpp_stdout_deque.append(
787 "--- test setUp() for %s.%s(%s) starts here ---\n" %
788 (self.__class__.__name__, self._testMethodName,
789 self._testMethodDoc))
790 self.vpp_stderr_deque.append(
791 "--- test setUp() for %s.%s(%s) starts here ---\n" %
792 (self.__class__.__name__, self._testMethodName,
793 self._testMethodDoc))
794 self.vapi.cli("clear trace")
795 # store the test instance inside the test class - so that objects
796 # holding the class can access instance methods (like assertEqual)
797 type(self).test_instance = self
800 def pg_enable_capture(cls, interfaces=None):
802 Enable capture on packet-generator interfaces
804 :param interfaces: iterable interface indexes (if None,
805 use self.pg_interfaces)
808 if interfaces is None:
809 interfaces = cls.pg_interfaces
814 def register_pcap(cls, intf, worker):
815 """ Register a pcap in the testclass """
816 # add to the list of captures with current timestamp
817 cls._pcaps.append((intf, worker))
820 def get_vpp_time(cls):
821 # processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT"
822 # returns float("2.190522")
823 timestr = cls.vapi.cli('show clock')
824 head, sep, tail = timestr.partition(',')
825 head, sep, tail = head.partition('Time now')
829 def sleep_on_vpp_time(cls, sec):
830 """ Sleep according to time in VPP world """
831 # On a busy system with many processes
832 # we might end up with VPP time being slower than real world
833 # So take that into account when waiting for VPP to do something
834 start_time = cls.get_vpp_time()
835 while cls.get_vpp_time() - start_time < sec:
839 def pg_start(cls, trace=True):
840 """ Enable the PG, wait till it is done, then clean up """
841 for (intf, worker) in cls._old_pcaps:
842 intf.handle_old_pcap_file(intf.get_in_path(worker),
843 intf.in_history_counter)
846 cls.vapi.cli("clear trace")
847 cls.vapi.cli("trace add pg-input 1000")
848 cls.vapi.cli('packet-generator enable')
849 # PG, when starts, runs to completion -
850 # so let's avoid a race condition,
851 # and wait a little till it's done.
852 # Then clean it up - and then be gone.
853 deadline = time.time() + 300
854 while cls.vapi.cli('show packet-generator').find("Yes") != -1:
855 cls.sleep(0.01) # yield
856 if time.time() > deadline:
857 cls.logger.error("Timeout waiting for pg to stop")
859 for intf, worker in cls._pcaps:
860 cls.vapi.cli('packet-generator delete %s' %
861 intf.get_cap_name(worker))
862 cls._old_pcaps = cls._pcaps
866 def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0,
869 Create packet-generator interfaces.
871 :param interfaces: iterable indexes of the interfaces.
872 :returns: List of created interfaces.
877 intf = VppPGInterface(cls, i, gso, gso_size, mode)
878 setattr(cls, intf.name, intf)
880 cls.pg_interfaces = result
884 def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
885 pgmode = VppEnum.vl_api_pg_interface_mode_t
886 return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
887 pgmode.PG_API_MODE_IP4)
890 def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
891 pgmode = VppEnum.vl_api_pg_interface_mode_t
892 return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
893 pgmode.PG_API_MODE_IP6)
896 def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
897 pgmode = VppEnum.vl_api_pg_interface_mode_t
898 return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
899 pgmode.PG_API_MODE_ETHERNET)
902 def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
903 pgmode = VppEnum.vl_api_pg_interface_mode_t
904 return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
905 pgmode.PG_API_MODE_ETHERNET)
908 def create_loopback_interfaces(cls, count):
910 Create loopback interfaces.
912 :param count: number of interfaces created.
913 :returns: List of created interfaces.
915 result = [VppLoInterface(cls) for i in range(count)]
917 setattr(cls, intf.name, intf)
918 cls.lo_interfaces = result
922 def create_bvi_interfaces(cls, count):
924 Create BVI interfaces.
926 :param count: number of interfaces created.
927 :returns: List of created interfaces.
929 result = [VppBviInterface(cls) for i in range(count)]
931 setattr(cls, intf.name, intf)
932 cls.bvi_interfaces = result
936 def extend_packet(packet, size, padding=' '):
938 Extend packet to given size by padding with spaces or custom padding
939 NOTE: Currently works only when Raw layer is present.
941 :param packet: packet
942 :param size: target size
943 :param padding: padding used to extend the payload
946 packet_len = len(packet) + 4
947 extend = size - packet_len
949 num = (extend // len(padding)) + 1
950 packet[Raw].load += (padding * num)[:extend].encode("ascii")
953 def reset_packet_infos(cls):
954 """ Reset the list of packet info objects and packet counts to zero """
955 cls._packet_infos = {}
956 cls._packet_count_for_dst_if_idx = {}
959 def create_packet_info(cls, src_if, dst_if):
961 Create packet info object containing the source and destination indexes
962 and add it to the testcase's packet info list
964 :param VppInterface src_if: source interface
965 :param VppInterface dst_if: destination interface
967 :returns: _PacketInfo object
971 info.index = len(cls._packet_infos)
972 info.src = src_if.sw_if_index
973 info.dst = dst_if.sw_if_index
974 if isinstance(dst_if, VppSubInterface):
975 dst_idx = dst_if.parent.sw_if_index
977 dst_idx = dst_if.sw_if_index
978 if dst_idx in cls._packet_count_for_dst_if_idx:
979 cls._packet_count_for_dst_if_idx[dst_idx] += 1
981 cls._packet_count_for_dst_if_idx[dst_idx] = 1
982 cls._packet_infos[info.index] = info
986 def info_to_payload(info):
988 Convert _PacketInfo object to packet payload
990 :param info: _PacketInfo object
992 :returns: string containing serialized data from packet info
995 # retrieve payload, currently 18 bytes (4 x ints + 1 short)
996 return pack('iiiih', info.index, info.src,
997 info.dst, info.ip, info.proto)
1000 def payload_to_info(payload, payload_field='load'):
1002 Convert packet payload to _PacketInfo object
1004 :param payload: packet payload
1005 :type payload: <class 'scapy.packet.Raw'>
1006 :param payload_field: packet fieldname of payload "load" for
1007 <class 'scapy.packet.Raw'>
1008 :type payload_field: str
1009 :returns: _PacketInfo object containing de-serialized data from payload
1013 # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1014 payload_b = getattr(payload, payload_field)[:18]
1016 info = _PacketInfo()
1017 info.index, info.src, info.dst, info.ip, info.proto \
1018 = unpack('iiiih', payload_b)
1020 # some SRv6 TCs depend on get an exception if bad values are detected
1021 if info.index > 0x4000:
1022 raise ValueError('Index value is invalid')
1026 def get_next_packet_info(self, info):
1028 Iterate over the packet info list stored in the testcase
1029 Start iteration with first element if info is None
1030 Continue based on index in info if info is specified
1032 :param info: info or None
1033 :returns: next info in list or None if no more infos
1038 next_index = info.index + 1
1039 if next_index == len(self._packet_infos):
1042 return self._packet_infos[next_index]
1044 def get_next_packet_info_for_interface(self, src_index, info):
1046 Search the packet info list for the next packet info with same source
1049 :param src_index: source interface index to search for
1050 :param info: packet info - where to start the search
1051 :returns: packet info or None
1055 info = self.get_next_packet_info(info)
1058 if info.src == src_index:
1061 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
1063 Search the packet info list for the next packet info with same source
1064 and destination interface indexes
1066 :param src_index: source interface index to search for
1067 :param dst_index: destination interface index to search for
1068 :param info: packet info - where to start the search
1069 :returns: packet info or None
1073 info = self.get_next_packet_info_for_interface(src_index, info)
1076 if info.dst == dst_index:
1079 def assert_equal(self, real_value, expected_value, name_or_class=None):
1080 if name_or_class is None:
1081 self.assertEqual(real_value, expected_value)
1084 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
1085 msg = msg % (getdoc(name_or_class).strip(),
1086 real_value, str(name_or_class(real_value)),
1087 expected_value, str(name_or_class(expected_value)))
1089 msg = "Invalid %s: %s does not match expected value %s" % (
1090 name_or_class, real_value, expected_value)
1092 self.assertEqual(real_value, expected_value, msg)
1094 def assert_in_range(self,
1102 msg = "Invalid %s: %s out of range <%s,%s>" % (
1103 name, real_value, expected_min, expected_max)
1104 self.assertTrue(expected_min <= real_value <= expected_max, msg)
1106 def assert_packet_checksums_valid(self, packet,
1107 ignore_zero_udp_checksums=True):
1108 received = packet.__class__(scapy.compat.raw(packet))
1109 udp_layers = ['UDP', 'UDPerror']
1110 checksum_fields = ['cksum', 'chksum']
1113 temp = received.__class__(scapy.compat.raw(received))
1115 layer = temp.getlayer(counter)
1117 layer = layer.copy()
1118 layer.remove_payload()
1119 for cf in checksum_fields:
1120 if hasattr(layer, cf):
1121 if ignore_zero_udp_checksums and \
1122 0 == getattr(layer, cf) and \
1123 layer.name in udp_layers:
1125 delattr(temp.getlayer(counter), cf)
1126 checksums.append((counter, cf))
1129 counter = counter + 1
1130 if 0 == len(checksums):
1132 temp = temp.__class__(scapy.compat.raw(temp))
1133 for layer, cf in checksums:
1134 calc_sum = getattr(temp[layer], cf)
1136 getattr(received[layer], cf), calc_sum,
1137 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
1139 "Checksum field `%s` on `%s` layer has correct value `%s`" %
1140 (cf, temp[layer].name, calc_sum))
1142 def assert_checksum_valid(self, received_packet, layer,
1143 field_name='chksum',
1144 ignore_zero_checksum=False):
1145 """ Check checksum of received packet on given layer """
1146 received_packet_checksum = getattr(received_packet[layer], field_name)
1147 if ignore_zero_checksum and 0 == received_packet_checksum:
1149 recalculated = received_packet.__class__(
1150 scapy.compat.raw(received_packet))
1151 delattr(recalculated[layer], field_name)
1152 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1153 self.assert_equal(received_packet_checksum,
1154 getattr(recalculated[layer], field_name),
1155 "packet checksum on layer: %s" % layer)
1157 def assert_ip_checksum_valid(self, received_packet,
1158 ignore_zero_checksum=False):
1159 self.assert_checksum_valid(received_packet, 'IP',
1160 ignore_zero_checksum=ignore_zero_checksum)
1162 def assert_tcp_checksum_valid(self, received_packet,
1163 ignore_zero_checksum=False):
1164 self.assert_checksum_valid(received_packet, 'TCP',
1165 ignore_zero_checksum=ignore_zero_checksum)
1167 def assert_udp_checksum_valid(self, received_packet,
1168 ignore_zero_checksum=True):
1169 self.assert_checksum_valid(received_packet, 'UDP',
1170 ignore_zero_checksum=ignore_zero_checksum)
1172 def assert_embedded_icmp_checksum_valid(self, received_packet):
1173 if received_packet.haslayer(IPerror):
1174 self.assert_checksum_valid(received_packet, 'IPerror')
1175 if received_packet.haslayer(TCPerror):
1176 self.assert_checksum_valid(received_packet, 'TCPerror')
1177 if received_packet.haslayer(UDPerror):
1178 self.assert_checksum_valid(received_packet, 'UDPerror',
1179 ignore_zero_checksum=True)
1180 if received_packet.haslayer(ICMPerror):
1181 self.assert_checksum_valid(received_packet, 'ICMPerror')
1183 def assert_icmp_checksum_valid(self, received_packet):
1184 self.assert_checksum_valid(received_packet, 'ICMP')
1185 self.assert_embedded_icmp_checksum_valid(received_packet)
1187 def assert_icmpv6_checksum_valid(self, pkt):
1188 if pkt.haslayer(ICMPv6DestUnreach):
1189 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
1190 self.assert_embedded_icmp_checksum_valid(pkt)
1191 if pkt.haslayer(ICMPv6EchoRequest):
1192 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
1193 if pkt.haslayer(ICMPv6EchoReply):
1194 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
1196 def get_packet_counter(self, counter):
1197 if counter.startswith("/"):
1198 counter_value = self.statistics.get_counter(counter)
1200 counters = self.vapi.cli("sh errors").split('\n')
1202 for i in range(1, len(counters) - 1):
1203 results = counters[i].split()
1204 if results[1] == counter:
1205 counter_value = int(results[0])
1207 return counter_value
1209 def assert_packet_counter_equal(self, counter, expected_value):
1210 counter_value = self.get_packet_counter(counter)
1211 self.assert_equal(counter_value, expected_value,
1212 "packet counter `%s'" % counter)
1214 def assert_error_counter_equal(self, counter, expected_value):
1215 counter_value = self.statistics[counter].sum()
1216 self.assert_equal(counter_value, expected_value,
1217 "error counter `%s'" % counter)
1220 def sleep(cls, timeout, remark=None):
1222 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1223 # * by Guido, only the main thread can be interrupted.
1225 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1228 if hasattr(os, 'sched_yield'):
1234 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1235 before = time.time()
1238 if after - before > 2 * timeout:
1239 cls.logger.error("unexpected self.sleep() result - "
1240 "slept for %es instead of ~%es!",
1241 after - before, timeout)
1244 "Finished sleep (%s) - slept %es (wanted %es)",
1245 remark, after - before, timeout)
1247 def virtual_sleep(self, timeout, remark=None):
1248 self.logger.debug("Moving VPP time by %s (%s)", timeout, remark)
1249 self.vapi.cli("set clock adjust %s" % timeout)
1251 def pg_send(self, intf, pkts, worker=None, trace=True):
1252 intf.add_stream(pkts, worker=worker)
1253 self.pg_enable_capture(self.pg_interfaces)
1254 self.pg_start(trace=trace)
1256 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None,
1258 self.pg_send(intf, pkts)
1261 for i in self.pg_interfaces:
1262 i.get_capture(0, timeout=timeout)
1263 i.assert_nothing_captured(remark=remark)
1266 self.logger.debug(self.vapi.cli("show trace"))
1268 def send_and_expect(self, intf, pkts, output, n_rx=None, worker=None,
1271 n_rx = 1 if isinstance(pkts, Packet) else len(pkts)
1272 self.pg_send(intf, pkts, worker=worker, trace=trace)
1273 rx = output.get_capture(n_rx)
1275 self.logger.debug(self.vapi.cli("show trace"))
1278 def send_and_expect_only(self, intf, pkts, output, timeout=None):
1279 self.pg_send(intf, pkts)
1280 rx = output.get_capture(len(pkts))
1284 for i in self.pg_interfaces:
1285 if i not in outputs:
1286 i.get_capture(0, timeout=timeout)
1287 i.assert_nothing_captured()
1293 def get_testcase_doc_name(test):
1294 return getdoc(test.__class__).splitlines()[0]
1297 def get_test_description(descriptions, test):
1298 short_description = test.shortDescription()
1299 if descriptions and short_description:
1300 return short_description
1305 class TestCaseInfo(object):
1306 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1307 self.logger = logger
1308 self.tempdir = tempdir
1309 self.vpp_pid = vpp_pid
1310 self.vpp_bin_path = vpp_bin_path
1311 self.core_crash_test = None
1314 class VppTestResult(unittest.TestResult):
1316 @property result_string
1317 String variable to store the test case result string.
1319 List variable containing 2-tuples of TestCase instances and strings
1320 holding formatted tracebacks. Each tuple represents a test which
1321 raised an unexpected exception.
1323 List variable containing 2-tuples of TestCase instances and strings
1324 holding formatted tracebacks. Each tuple represents a test where
1325 a failure was explicitly signalled using the TestCase.assert*()
1329 failed_test_cases_info = set()
1330 core_crash_test_cases_info = set()
1331 current_test_case_info = None
1333 def __init__(self, stream=None, descriptions=None, verbosity=None,
1336 :param stream File descriptor to store where to report test results.
1337 Set to the standard error stream by default.
1338 :param descriptions Boolean variable to store information if to use
1339 test case descriptions.
1340 :param verbosity Integer variable to store required verbosity level.
1342 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1343 self.stream = stream
1344 self.descriptions = descriptions
1345 self.verbosity = verbosity
1346 self.result_string = None
1347 self.runner = runner
1350 def addSuccess(self, test):
1352 Record a test succeeded result
1357 if self.current_test_case_info:
1358 self.current_test_case_info.logger.debug(
1359 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1360 test._testMethodName,
1361 test._testMethodDoc))
1362 unittest.TestResult.addSuccess(self, test)
1363 self.result_string = colorize("OK", GREEN)
1365 self.send_result_through_pipe(test, PASS)
1367 def addSkip(self, test, reason):
1369 Record a test skipped.
1375 if self.current_test_case_info:
1376 self.current_test_case_info.logger.debug(
1377 "--- addSkip() %s.%s(%s) called, reason is %s" %
1378 (test.__class__.__name__, test._testMethodName,
1379 test._testMethodDoc, reason))
1380 unittest.TestResult.addSkip(self, test, reason)
1381 self.result_string = colorize("SKIP", YELLOW)
1383 if reason == "not enough cpus":
1384 self.send_result_through_pipe(test, SKIP_CPU_SHORTAGE)
1386 self.send_result_through_pipe(test, SKIP)
1388 def symlink_failed(self):
1389 if self.current_test_case_info:
1391 failed_dir = config.failed_dir
1392 link_path = os.path.join(
1395 os.path.basename(self.current_test_case_info.tempdir))
1397 self.current_test_case_info.logger.debug(
1398 "creating a link to the failed test")
1399 self.current_test_case_info.logger.debug(
1400 "os.symlink(%s, %s)" %
1401 (self.current_test_case_info.tempdir, link_path))
1402 if os.path.exists(link_path):
1403 self.current_test_case_info.logger.debug(
1404 'symlink already exists')
1406 os.symlink(self.current_test_case_info.tempdir, link_path)
1408 except Exception as e:
1409 self.current_test_case_info.logger.error(e)
1411 def send_result_through_pipe(self, test, result):
1412 if hasattr(self, 'test_framework_result_pipe'):
1413 pipe = self.test_framework_result_pipe
1415 pipe.send((test.id(), result))
1417 def log_error(self, test, err, fn_name):
1418 if self.current_test_case_info:
1419 if isinstance(test, unittest.suite._ErrorHolder):
1420 test_name = test.description
1422 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1423 test._testMethodName,
1424 test._testMethodDoc)
1425 self.current_test_case_info.logger.debug(
1426 "--- %s() %s called, err is %s" %
1427 (fn_name, test_name, err))
1428 self.current_test_case_info.logger.debug(
1429 "formatted exception is:\n%s" %
1430 "".join(format_exception(*err)))
1432 def add_error(self, test, err, unittest_fn, error_type):
1433 if error_type == FAIL:
1434 self.log_error(test, err, 'addFailure')
1435 error_type_str = colorize("FAIL", RED)
1436 elif error_type == ERROR:
1437 self.log_error(test, err, 'addError')
1438 error_type_str = colorize("ERROR", RED)
1440 raise Exception('Error type %s cannot be used to record an '
1441 'error or a failure' % error_type)
1443 unittest_fn(self, test, err)
1444 if self.current_test_case_info:
1445 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1447 self.current_test_case_info.tempdir)
1448 self.symlink_failed()
1449 self.failed_test_cases_info.add(self.current_test_case_info)
1450 if is_core_present(self.current_test_case_info.tempdir):
1451 if not self.current_test_case_info.core_crash_test:
1452 if isinstance(test, unittest.suite._ErrorHolder):
1453 test_name = str(test)
1455 test_name = "'{!s}' ({!s})".format(
1456 get_testcase_doc_name(test), test.id())
1457 self.current_test_case_info.core_crash_test = test_name
1458 self.core_crash_test_cases_info.add(
1459 self.current_test_case_info)
1461 self.result_string = '%s [no temp dir]' % error_type_str
1463 self.send_result_through_pipe(test, error_type)
1465 def addFailure(self, test, err):
1467 Record a test failed result
1470 :param err: error message
1473 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1475 def addError(self, test, err):
1477 Record a test error result
1480 :param err: error message
1483 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1485 def getDescription(self, test):
1487 Get test description
1490 :returns: test description
1493 return get_test_description(self.descriptions, test)
1495 def startTest(self, test):
1503 def print_header(test):
1504 if test.__class__ in self.printed:
1507 test_doc = getdoc(test)
1509 raise Exception("No doc string for test '%s'" % test.id())
1511 test_title = test_doc.splitlines()[0].rstrip()
1512 test_title = colorize(test_title, GREEN)
1513 if test.is_tagged_run_solo():
1514 test_title = colorize(f"SOLO RUN: {test_title}", YELLOW)
1516 # This block may overwrite the colorized title above,
1517 # but we want this to stand out and be fixed
1518 if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
1519 test_title = colorize(
1520 f"FIXME with VPP workers: {test_title}", RED)
1522 if test.has_tag(TestCaseTag.FIXME_ASAN):
1523 test_title = colorize(
1524 f"FIXME with ASAN: {test_title}", RED)
1525 test.skip_fixme_asan()
1527 if hasattr(test, 'vpp_worker_count'):
1528 if test.vpp_worker_count == 0:
1529 test_title += " [main thread only]"
1530 elif test.vpp_worker_count == 1:
1531 test_title += " [1 worker thread]"
1533 test_title += f" [{test.vpp_worker_count} worker threads]"
1535 if test.__class__.skipped_due_to_cpu_lack:
1536 test_title = colorize(
1537 f"{test_title} [skipped - not enough cpus, "
1538 f"required={test.__class__.get_cpus_required()}, "
1539 f"available={max_vpp_cpus}]", YELLOW)
1541 print(double_line_delim)
1543 print(double_line_delim)
1544 self.printed.append(test.__class__)
1547 self.start_test = time.time()
1548 unittest.TestResult.startTest(self, test)
1549 if self.verbosity > 0:
1550 self.stream.writeln(
1551 "Starting " + self.getDescription(test) + " ...")
1552 self.stream.writeln(single_line_delim)
1554 def stopTest(self, test):
1556 Called when the given test has been run
1561 unittest.TestResult.stopTest(self, test)
1563 if self.verbosity > 0:
1564 self.stream.writeln(single_line_delim)
1565 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1566 self.result_string))
1567 self.stream.writeln(single_line_delim)
1569 self.stream.writeln("%-68s %4.2f %s" %
1570 (self.getDescription(test),
1571 time.time() - self.start_test,
1572 self.result_string))
1574 self.send_result_through_pipe(test, TEST_RUN)
1576 def printErrors(self):
1578 Print errors from running the test case
1580 if len(self.errors) > 0 or len(self.failures) > 0:
1581 self.stream.writeln()
1582 self.printErrorList('ERROR', self.errors)
1583 self.printErrorList('FAIL', self.failures)
1585 # ^^ that is the last output from unittest before summary
1586 if not self.runner.print_summary:
1587 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1588 self.stream = devnull
1589 self.runner.stream = devnull
1591 def printErrorList(self, flavour, errors):
1593 Print error list to the output stream together with error type
1594 and test case description.
1596 :param flavour: error type
1597 :param errors: iterable errors
1600 for test, err in errors:
1601 self.stream.writeln(double_line_delim)
1602 self.stream.writeln("%s: %s" %
1603 (flavour, self.getDescription(test)))
1604 self.stream.writeln(single_line_delim)
1605 self.stream.writeln("%s" % err)
1608 class VppTestRunner(unittest.TextTestRunner):
1610 A basic test runner implementation which prints results to standard error.
1614 def resultclass(self):
1615 """Class maintaining the results of the tests"""
1616 return VppTestResult
1618 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1619 result_pipe=None, failfast=False, buffer=False,
1620 resultclass=None, print_summary=True, **kwargs):
1621 # ignore stream setting here, use hard-coded stdout to be in sync
1622 # with prints from VppTestCase methods ...
1623 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1624 verbosity, failfast, buffer,
1625 resultclass, **kwargs)
1626 KeepAliveReporter.pipe = keep_alive_pipe
1628 self.orig_stream = self.stream
1629 self.resultclass.test_framework_result_pipe = result_pipe
1631 self.print_summary = print_summary
1633 def _makeResult(self):
1634 return self.resultclass(self.stream,
1639 def run(self, test):
1646 faulthandler.enable() # emit stack trace to stderr if killed by signal
1648 result = super(VppTestRunner, self).run(test)
1649 if not self.print_summary:
1650 self.stream = self.orig_stream
1651 result.stream = self.orig_stream
1655 class Worker(Thread):
1656 def __init__(self, executable_args, logger, env=None, *args, **kwargs):
1657 super(Worker, self).__init__(*args, **kwargs)
1658 self.logger = logger
1659 self.args = executable_args
1660 if hasattr(self, 'testcase') and self.testcase.debug_all:
1661 if self.testcase.debug_gdbserver:
1662 self.args = ['/usr/bin/gdbserver', 'localhost:{port}'
1663 .format(port=self.testcase.gdbserver_port)] + args
1664 elif self.testcase.debug_gdb and hasattr(self, 'wait_for_gdb'):
1665 self.args.append(self.wait_for_gdb)
1666 self.app_bin = executable_args[0]
1667 self.app_name = os.path.basename(self.app_bin)
1668 if hasattr(self, 'role'):
1669 self.app_name += ' {role}'.format(role=self.role)
1672 env = {} if env is None else env
1673 self.env = copy.deepcopy(env)
1675 def wait_for_enter(self):
1676 if not hasattr(self, 'testcase'):
1678 if self.testcase.debug_all and self.testcase.debug_gdbserver:
1680 print(double_line_delim)
1681 print("Spawned GDB Server for '{app}' with PID: {pid}"
1682 .format(app=self.app_name, pid=self.process.pid))
1683 elif self.testcase.debug_all and self.testcase.debug_gdb:
1685 print(double_line_delim)
1686 print("Spawned '{app}' with PID: {pid}"
1687 .format(app=self.app_name, pid=self.process.pid))
1690 print(single_line_delim)
1691 print("You can debug '{app}' using:".format(app=self.app_name))
1692 if self.testcase.debug_gdbserver:
1693 print("sudo gdb " + self.app_bin +
1694 " -ex 'target remote localhost:{port}'"
1695 .format(port=self.testcase.gdbserver_port))
1696 print("Now is the time to attach gdb by running the above "
1697 "command, set up breakpoints etc., then resume from "
1698 "within gdb by issuing the 'continue' command")
1699 self.testcase.gdbserver_port += 1
1700 elif self.testcase.debug_gdb:
1701 print("sudo gdb " + self.app_bin +
1702 " -ex 'attach {pid}'".format(pid=self.process.pid))
1703 print("Now is the time to attach gdb by running the above "
1704 "command and set up breakpoints etc., then resume from"
1705 " within gdb by issuing the 'continue' command")
1706 print(single_line_delim)
1707 input("Press ENTER to continue running the testcase...")
1710 executable = self.args[0]
1711 if not os.path.exists(executable) or not os.access(
1712 executable, os.F_OK | os.X_OK):
1713 # Exit code that means some system file did not exist,
1714 # could not be opened, or had some other kind of error.
1715 self.result = os.EX_OSFILE
1716 raise EnvironmentError(
1717 "executable '%s' is not found or executable." % executable)
1718 self.logger.debug("Running executable '{app}': '{cmd}'"
1719 .format(app=self.app_name,
1720 cmd=' '.join(self.args)))
1721 env = os.environ.copy()
1722 env.update(self.env)
1723 env["CK_LOG_FILE_NAME"] = "-"
1724 self.process = subprocess.Popen(
1725 ['stdbuf', '-o0', '-e0'] + self.args, shell=False, env=env,
1726 preexec_fn=os.setpgrp, stdout=subprocess.PIPE,
1727 stderr=subprocess.PIPE)
1728 self.wait_for_enter()
1729 out, err = self.process.communicate()
1730 self.logger.debug("Finished running `{app}'".format(app=self.app_name))
1731 self.logger.info("Return code is `%s'" % self.process.returncode)
1732 self.logger.info(single_line_delim)
1733 self.logger.info("Executable `{app}' wrote to stdout:"
1734 .format(app=self.app_name))
1735 self.logger.info(single_line_delim)
1736 self.logger.info(out.decode('utf-8'))
1737 self.logger.info(single_line_delim)
1738 self.logger.info("Executable `{app}' wrote to stderr:"
1739 .format(app=self.app_name))
1740 self.logger.info(single_line_delim)
1741 self.logger.info(err.decode('utf-8'))
1742 self.logger.info(single_line_delim)
1743 self.result = self.process.returncode
1746 if __name__ == '__main__':