3 from __future__ import print_function
19 from collections import deque
20 from threading import Thread, Event
21 from inspect import getdoc, isclass
22 from traceback import format_exception
23 from logging import FileHandler, DEBUG, Formatter
25 from abc import ABC, abstractmethod
26 from struct import pack, unpack
29 from scapy.packet import Raw
30 import hook as hookmodule
31 from vpp_pg_interface import VppPGInterface
32 from vpp_sub_interface import VppSubInterface
33 from vpp_lo_interface import VppLoInterface
34 from vpp_bvi_interface import VppBviInterface
35 from vpp_papi_provider import VppPapiProvider
36 from vpp_papi import VppEnum
38 from vpp_papi.vpp_stats import VPPStats
39 from vpp_papi.vpp_transport_socket import VppTransportSocketIOError
40 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
42 from vpp_object import VppObjectRegistry
43 from util import ppp, is_core_present
44 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
45 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
46 from scapy.layers.inet6 import ICMPv6EchoReply
48 from cpu_config import available_cpus, num_cpus, max_vpp_cpus
50 logger = logging.getLogger(__name__)
52 # Set up an empty logger for the testcase that can be overridden as necessary
53 null_logger = logging.getLogger('VppTestCase')
54 null_logger.addHandler(logging.NullHandler())
64 class BoolEnvironmentVariable(object):
66 def __init__(self, env_var_name, default='n', true_values=None):
67 self.name = env_var_name
68 self.default = default
69 self.true_values = true_values if true_values is not None else \
73 return os.getenv(self.name, self.default).lower() in self.true_values
75 if sys.version_info[0] == 2:
76 __nonzero__ = __bool__
79 return 'BoolEnvironmentVariable(%r, default=%r, true_values=%r)' % \
80 (self.name, self.default, self.true_values)
83 debug_framework = BoolEnvironmentVariable('TEST_DEBUG')
88 Test framework module.
90 The module provides a set of tools for constructing and running tests and
91 representing the results.
95 class VppDiedError(Exception):
96 """ exception for reporting that the subprocess has died."""
98 signals_by_value = {v: k for k, v in signal.__dict__.items() if
99 k.startswith('SIG') and not k.startswith('SIG_')}
101 def __init__(self, rv=None, testcase=None, method_name=None):
103 self.signal_name = None
104 self.testcase = testcase
105 self.method_name = method_name
108 self.signal_name = VppDiedError.signals_by_value[-rv]
109 except (KeyError, TypeError):
112 if testcase is None and method_name is None:
115 in_msg = ' while running %s.%s' % (testcase, method_name)
118 msg = "VPP subprocess died unexpectedly%s with return code: %d%s."\
119 % (in_msg, self.rv, ' [%s]' %
121 self.signal_name is not None else ''))
123 msg = "VPP subprocess died unexpectedly%s." % in_msg
125 super(VppDiedError, self).__init__(msg)
128 class _PacketInfo(object):
129 """Private class to create packet info object.
131 Help process information about the next packet.
132 Set variables to default values.
134 #: Store the index of the packet.
136 #: Store the index of the source packet generator interface of the packet.
138 #: Store the index of the destination packet generator interface
141 #: Store expected ip version
143 #: Store expected upper protocol
145 #: Store the copy of the former packet.
148 def __eq__(self, other):
149 index = self.index == other.index
150 src = self.src == other.src
151 dst = self.dst == other.dst
152 data = self.data == other.data
153 return index and src and dst and data
156 def pump_output(testclass):
157 """ pump output from vpp stdout/stderr to proper queues """
160 while not testclass.pump_thread_stop_flag.is_set():
161 readable = select.select([testclass.vpp.stdout.fileno(),
162 testclass.vpp.stderr.fileno(),
163 testclass.pump_thread_wakeup_pipe[0]],
165 if testclass.vpp.stdout.fileno() in readable:
166 read = os.read(testclass.vpp.stdout.fileno(), 102400)
168 split = read.decode('ascii',
169 errors='backslashreplace').splitlines(True)
170 if len(stdout_fragment) > 0:
171 split[0] = "%s%s" % (stdout_fragment, split[0])
172 if len(split) > 0 and split[-1].endswith("\n"):
176 stdout_fragment = split[-1]
177 testclass.vpp_stdout_deque.extend(split[:limit])
178 if not testclass.cache_vpp_output:
179 for line in split[:limit]:
180 testclass.logger.info(
181 "VPP STDOUT: %s" % line.rstrip("\n"))
182 if testclass.vpp.stderr.fileno() in readable:
183 read = os.read(testclass.vpp.stderr.fileno(), 102400)
185 split = read.decode('ascii',
186 errors='backslashreplace').splitlines(True)
187 if len(stderr_fragment) > 0:
188 split[0] = "%s%s" % (stderr_fragment, split[0])
189 if len(split) > 0 and split[-1].endswith("\n"):
193 stderr_fragment = split[-1]
195 testclass.vpp_stderr_deque.extend(split[:limit])
196 if not testclass.cache_vpp_output:
197 for line in split[:limit]:
198 testclass.logger.error(
199 "VPP STDERR: %s" % line.rstrip("\n"))
200 # ignoring the dummy pipe here intentionally - the
201 # flag will take care of properly terminating the loop
204 def _is_skip_aarch64_set():
205 return BoolEnvironmentVariable('SKIP_AARCH64')
208 is_skip_aarch64_set = _is_skip_aarch64_set()
211 def _is_platform_aarch64():
212 return platform.machine() == 'aarch64'
215 is_platform_aarch64 = _is_platform_aarch64()
218 def _running_extended_tests():
219 return BoolEnvironmentVariable("EXTENDED_TESTS")
222 running_extended_tests = _running_extended_tests()
225 def _running_gcov_tests():
226 return BoolEnvironmentVariable("GCOV_TESTS")
229 running_gcov_tests = _running_gcov_tests()
232 def get_environ_vpp_worker_count():
233 worker_config = os.getenv("VPP_WORKER_CONFIG", None)
235 elems = worker_config.split(" ")
236 if elems[0] != "workers" or len(elems) != 2:
237 raise ValueError("Wrong VPP_WORKER_CONFIG == '%s' value." %
244 environ_vpp_worker_count = get_environ_vpp_worker_count()
247 class KeepAliveReporter(object):
249 Singleton object which reports test start to parent process
254 self.__dict__ = self._shared_state
262 def pipe(self, pipe):
263 if self._pipe is not None:
264 raise Exception("Internal error - pipe should only be set once.")
267 def send_keep_alive(self, test, desc=None):
269 Write current test tmpdir & desc to keep-alive pipe to signal liveness
271 if self.pipe is None:
272 # if not running forked..
276 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
280 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
283 class TestCaseTag(Enum):
284 # marks the suites that must run at the end
285 # using only a single test runner
287 # marks the suites broken on VPP multi-worker
288 FIXME_VPP_WORKERS = 2
291 def create_tag_decorator(e):
294 cls.test_tags.append(e)
295 except AttributeError:
301 tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
302 tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
316 class CPUInterface(ABC):
318 skipped_due_to_cpu_lack = False
322 def get_cpus_required(cls):
326 def assign_cpus(cls, cpus):
330 class VppTestCase(CPUInterface, unittest.TestCase):
331 """This subclass is a base class for VPP test cases that are implemented as
332 classes. It provides methods to create and run test case.
335 extra_vpp_statseg_config = ""
336 extra_vpp_punt_config = []
337 extra_vpp_plugin_config = []
339 vapi_response_timeout = 5
342 def packet_infos(self):
343 """List of packet infos"""
344 return self._packet_infos
347 def get_packet_count_for_if_idx(cls, dst_if_index):
348 """Get the number of packet info for specified destination if index"""
349 if dst_if_index in cls._packet_count_for_dst_if_idx:
350 return cls._packet_count_for_dst_if_idx[dst_if_index]
355 def has_tag(cls, tag):
356 """ if the test case has a given tag - return true """
358 return tag in cls.test_tags
359 except AttributeError:
364 def is_tagged_run_solo(cls):
365 """ if the test case class is timing-sensitive - return true """
366 return cls.has_tag(TestCaseTag.RUN_SOLO)
370 """Return the instance of this testcase"""
371 return cls.test_instance
374 def set_debug_flags(cls, d):
375 cls.gdbserver_port = 7777
376 cls.debug_core = False
377 cls.debug_gdb = False
378 cls.debug_gdbserver = False
379 cls.debug_all = False
380 cls.debug_attach = False
385 cls.debug_core = True
386 elif dl == "gdb" or dl == "gdb-all":
388 elif dl == "gdbserver" or dl == "gdbserver-all":
389 cls.debug_gdbserver = True
391 cls.debug_attach = True
393 raise Exception("Unrecognized DEBUG option: '%s'" % d)
394 if dl == "gdb-all" or dl == "gdbserver-all":
398 def get_vpp_worker_count(cls):
399 if not hasattr(cls, "vpp_worker_count"):
400 if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
401 cls.vpp_worker_count = 0
403 cls.vpp_worker_count = environ_vpp_worker_count
404 return cls.vpp_worker_count
407 def get_cpus_required(cls):
408 return 1 + cls.get_vpp_worker_count()
411 def setUpConstants(cls):
412 """ Set-up the test case class based on environment variables """
413 cls.step = BoolEnvironmentVariable('STEP')
414 # inverted case to handle '' == True
415 c = os.getenv("CACHE_OUTPUT", "1")
416 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
417 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
418 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
419 cls.test_plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
420 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
422 if cls.plugin_path is not None:
423 if cls.extern_plugin_path is not None:
424 plugin_path = "%s:%s" % (
425 cls.plugin_path, cls.extern_plugin_path)
427 plugin_path = cls.plugin_path
428 elif cls.extern_plugin_path is not None:
429 plugin_path = cls.extern_plugin_path
431 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
432 debug_cli = "cli-listen localhost:5002"
434 size = os.getenv("COREDUMP_SIZE")
436 coredump_size = "coredump-size %s" % size
437 if coredump_size is None:
438 coredump_size = "coredump-size unlimited"
440 default_variant = os.getenv("VARIANT")
441 if default_variant is not None:
442 default_variant = "defaults { %s 100 }" % default_variant
446 api_fuzzing = os.getenv("API_FUZZ")
447 if api_fuzzing is None:
452 "unix", "{", "nodaemon", debug_cli, "full-coredump",
453 coredump_size, "runtime-dir", cls.tempdir, "}",
454 "api-trace", "{", "on", "}",
455 "api-segment", "{", "prefix", cls.get_api_segment_prefix(), "}",
456 "cpu", "{", "main-core", str(cls.cpus[0]), ]
457 if cls.get_vpp_worker_count():
458 cls.vpp_cmdline.extend([
459 "corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])])
460 cls.vpp_cmdline.extend([
462 "physmem", "{", "max-size", "32m", "}",
463 "statseg", "{", "socket-name", cls.get_stats_sock_path(),
464 cls.extra_vpp_statseg_config, "}",
465 "socksvr", "{", "socket-name", cls.get_api_sock_path(), "}",
466 "node { ", default_variant, "}",
467 "api-fuzz {", api_fuzzing, "}",
468 "plugins", "{", "plugin", "dpdk_plugin.so", "{", "disable", "}",
469 "plugin", "rdma_plugin.so", "{", "disable", "}",
470 "plugin", "lisp_unittest_plugin.so", "{", "enable", "}",
471 "plugin", "unittest_plugin.so", "{", "enable", "}"
472 ] + cls.extra_vpp_plugin_config + ["}", ])
474 if cls.extra_vpp_punt_config is not None:
475 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
476 if plugin_path is not None:
477 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
478 if cls.test_plugin_path is not None:
479 cls.vpp_cmdline.extend(["test_plugin_path", cls.test_plugin_path])
481 if not cls.debug_attach:
482 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
483 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
486 def wait_for_enter(cls):
487 if cls.debug_gdbserver:
488 print(double_line_delim)
489 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
491 print(double_line_delim)
492 print("Spawned VPP with PID: %d" % cls.vpp.pid)
494 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
496 print(single_line_delim)
497 print("You can debug VPP using:")
498 if cls.debug_gdbserver:
499 print("sudo gdb " + cls.vpp_bin +
500 " -ex 'target remote localhost:{port}'"
501 .format(port=cls.gdbserver_port))
502 print("Now is the time to attach gdb by running the above "
503 "command, set up breakpoints etc., then resume VPP from "
504 "within gdb by issuing the 'continue' command")
505 cls.gdbserver_port += 1
507 print("sudo gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
508 print("Now is the time to attach gdb by running the above "
509 "command and set up breakpoints etc., then resume VPP from"
510 " within gdb by issuing the 'continue' command")
511 print(single_line_delim)
512 input("Press ENTER to continue running the testcase...")
520 cls.logger.debug(f"Assigned cpus: {cls.cpus}")
521 cmdline = cls.vpp_cmdline
523 if cls.debug_gdbserver:
524 gdbserver = '/usr/bin/gdbserver'
525 if not os.path.isfile(gdbserver) or\
526 not os.access(gdbserver, os.X_OK):
527 raise Exception("gdbserver binary '%s' does not exist or is "
528 "not executable" % gdbserver)
530 cmdline = [gdbserver, 'localhost:{port}'
531 .format(port=cls.gdbserver_port)] + cls.vpp_cmdline
532 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
535 cls.vpp = subprocess.Popen(cmdline,
536 stdout=subprocess.PIPE,
537 stderr=subprocess.PIPE)
538 except subprocess.CalledProcessError as e:
539 cls.logger.critical("Subprocess returned with non-0 return code: ("
543 cls.logger.critical("Subprocess returned with OS error: "
544 "(%s) %s", e.errno, e.strerror)
546 except Exception as e:
547 cls.logger.exception("Subprocess returned unexpected from "
554 def wait_for_coredump(cls):
555 corefile = cls.tempdir + "/core"
556 if os.path.isfile(corefile):
557 cls.logger.error("Waiting for coredump to complete: %s", corefile)
558 curr_size = os.path.getsize(corefile)
559 deadline = time.time() + 60
561 while time.time() < deadline:
564 curr_size = os.path.getsize(corefile)
565 if size == curr_size:
569 cls.logger.error("Timed out waiting for coredump to complete:"
572 cls.logger.error("Coredump complete: %s, size %d",
576 def get_stats_sock_path(cls):
577 return "%s/stats.sock" % cls.tempdir
580 def get_api_sock_path(cls):
581 return "%s/api.sock" % cls.tempdir
584 def get_api_segment_prefix(cls):
585 return os.path.basename(cls.tempdir) # Only used for VAPI
588 def get_tempdir(cls):
590 return os.getenv("VPP_IN_GDB_TMP_DIR",
591 "/tmp/unittest-attach-gdb")
593 return tempfile.mkdtemp(prefix='vpp-unittest-%s-' % cls.__name__)
598 Perform class setup before running the testcase
599 Remove shared memory files, start vpp and connect the vpp-api
601 super(VppTestCase, cls).setUpClass()
602 cls.logger = get_logger(cls.__name__)
603 seed = os.environ["RND_SEED"]
605 if hasattr(cls, 'parallel_handler'):
606 cls.logger.addHandler(cls.parallel_handler)
607 cls.logger.propagate = False
608 d = os.getenv("DEBUG", None)
609 cls.set_debug_flags(d)
610 cls.tempdir = cls.get_tempdir()
611 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
612 cls.file_handler.setFormatter(
613 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
615 cls.file_handler.setLevel(DEBUG)
616 cls.logger.addHandler(cls.file_handler)
617 cls.logger.debug("--- setUpClass() for %s called ---" % cls.__name__)
618 os.chdir(cls.tempdir)
619 cls.logger.info("Temporary dir is %s, api socket is %s",
620 cls.tempdir, cls.get_api_sock_path())
621 cls.logger.debug("Random seed is %s", seed)
623 cls.reset_packet_infos()
628 cls.registry = VppObjectRegistry()
629 cls.vpp_startup_failed = False
630 cls.reporter = KeepAliveReporter()
631 # need to catch exceptions here because if we raise, then the cleanup
632 # doesn't get called and we might end with a zombie vpp
638 cls.reporter.send_keep_alive(cls, 'setUpClass')
639 VppTestResult.current_test_case_info = TestCaseInfo(
640 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
641 cls.vpp_stdout_deque = deque()
642 cls.vpp_stderr_deque = deque()
643 if not cls.debug_attach:
644 cls.pump_thread_stop_flag = Event()
645 cls.pump_thread_wakeup_pipe = os.pipe()
646 cls.pump_thread = Thread(target=pump_output, args=(cls,))
647 cls.pump_thread.daemon = True
648 cls.pump_thread.start()
649 if cls.debug_gdb or cls.debug_gdbserver or cls.debug_attach:
650 cls.vapi_response_timeout = 0
651 cls.vapi = VppPapiProvider(cls.__name__, cls,
652 cls.vapi_response_timeout)
654 hook = hookmodule.StepHook(cls)
656 hook = hookmodule.PollHook(cls)
657 cls.vapi.register_hook(hook)
658 cls.statistics = VPPStats(socketname=cls.get_stats_sock_path())
662 cls.vpp_startup_failed = True
664 "VPP died shortly after startup, check the"
665 " output to standard error for possible cause")
669 except (vpp_papi.VPPIOError, Exception) as e:
670 cls.logger.debug("Exception connecting to vapi: %s" % e)
671 cls.vapi.disconnect()
673 if cls.debug_gdbserver:
674 print(colorize("You're running VPP inside gdbserver but "
675 "VPP-API connection failed, did you forget "
676 "to 'continue' VPP from within gdb?", RED))
679 last_line = cls.vapi.cli("show thread").split("\n")[-2]
680 cls.vpp_worker_count = int(last_line.split(" ")[0])
681 print("Detected VPP with %s workers." % cls.vpp_worker_count)
682 except vpp_papi.VPPRuntimeError as e:
683 cls.logger.debug("%s" % e)
686 except Exception as e:
687 cls.logger.debug("Exception connecting to VPP: %s" % e)
692 def _debug_quit(cls):
693 if (cls.debug_gdbserver or cls.debug_gdb):
697 if cls.vpp.returncode is None:
699 print(double_line_delim)
700 print("VPP or GDB server is still running")
701 print(single_line_delim)
702 input("When done debugging, press ENTER to kill the "
703 "process and finish running the testcase...")
704 except AttributeError:
710 Disconnect vpp-api, kill vpp and cleanup shared memory files
714 # first signal that we want to stop the pump thread, then wake it up
715 if hasattr(cls, 'pump_thread_stop_flag'):
716 cls.pump_thread_stop_flag.set()
717 if hasattr(cls, 'pump_thread_wakeup_pipe'):
718 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
719 if hasattr(cls, 'pump_thread'):
720 cls.logger.debug("Waiting for pump thread to stop")
721 cls.pump_thread.join()
722 if hasattr(cls, 'vpp_stderr_reader_thread'):
723 cls.logger.debug("Waiting for stderr pump to stop")
724 cls.vpp_stderr_reader_thread.join()
726 if hasattr(cls, 'vpp'):
727 if hasattr(cls, 'vapi'):
728 cls.logger.debug(cls.vapi.vpp.get_stats())
729 cls.logger.debug("Disconnecting class vapi client on %s",
731 cls.vapi.disconnect()
732 cls.logger.debug("Deleting class vapi attribute on %s",
736 if not cls.debug_attach and cls.vpp.returncode is None:
737 cls.wait_for_coredump()
738 cls.logger.debug("Sending TERM to vpp")
740 cls.logger.debug("Waiting for vpp to die")
742 outs, errs = cls.vpp.communicate(timeout=5)
743 except subprocess.TimeoutExpired:
745 outs, errs = cls.vpp.communicate()
746 cls.logger.debug("Deleting class vpp attribute on %s",
748 if not cls.debug_attach:
749 cls.vpp.stdout.close()
750 cls.vpp.stderr.close()
753 if cls.vpp_startup_failed:
754 stdout_log = cls.logger.info
755 stderr_log = cls.logger.critical
757 stdout_log = cls.logger.info
758 stderr_log = cls.logger.info
760 if hasattr(cls, 'vpp_stdout_deque'):
761 stdout_log(single_line_delim)
762 stdout_log('VPP output to stdout while running %s:', cls.__name__)
763 stdout_log(single_line_delim)
764 vpp_output = "".join(cls.vpp_stdout_deque)
765 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
767 stdout_log('\n%s', vpp_output)
768 stdout_log(single_line_delim)
770 if hasattr(cls, 'vpp_stderr_deque'):
771 stderr_log(single_line_delim)
772 stderr_log('VPP output to stderr while running %s:', cls.__name__)
773 stderr_log(single_line_delim)
774 vpp_output = "".join(cls.vpp_stderr_deque)
775 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
777 stderr_log('\n%s', vpp_output)
778 stderr_log(single_line_delim)
781 def tearDownClass(cls):
782 """ Perform final cleanup after running all tests in this test-case """
783 cls.logger.debug("--- tearDownClass() for %s called ---" %
785 cls.reporter.send_keep_alive(cls, 'tearDownClass')
787 cls.file_handler.close()
788 cls.reset_packet_infos()
790 debug_internal.on_tear_down_class(cls)
792 def show_commands_at_teardown(self):
793 """ Allow subclass specific teardown logging additions."""
794 self.logger.info("--- No test specific show commands provided. ---")
797 """ Show various debug prints after each test """
798 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
799 (self.__class__.__name__, self._testMethodName,
800 self._testMethodDoc))
803 if not self.vpp_dead:
804 self.logger.debug(self.vapi.cli("show trace max 1000"))
805 self.logger.info(self.vapi.ppcli("show interface"))
806 self.logger.info(self.vapi.ppcli("show hardware"))
807 self.logger.info(self.statistics.set_errors_str())
808 self.logger.info(self.vapi.ppcli("show run"))
809 self.logger.info(self.vapi.ppcli("show log"))
810 self.logger.info(self.vapi.ppcli("show bihash"))
811 self.logger.info("Logging testcase specific show commands.")
812 self.show_commands_at_teardown()
813 self.registry.remove_vpp_config(self.logger)
814 # Save/Dump VPP api trace log
815 m = self._testMethodName
816 api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
817 tmp_api_trace = "/tmp/%s" % api_trace
818 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
819 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
820 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
822 os.rename(tmp_api_trace, vpp_api_trace_log)
823 except VppTransportSocketIOError:
824 self.logger.debug("VppTransportSocketIOError: Vpp dead. "
825 "Cannot log show commands.")
828 self.registry.unregister_all(self.logger)
831 """ Clear trace before running each test"""
832 super(VppTestCase, self).setUp()
833 self.reporter.send_keep_alive(self)
835 raise VppDiedError(rv=None, testcase=self.__class__.__name__,
836 method_name=self._testMethodName)
837 self.sleep(.1, "during setUp")
838 self.vpp_stdout_deque.append(
839 "--- test setUp() for %s.%s(%s) starts here ---\n" %
840 (self.__class__.__name__, self._testMethodName,
841 self._testMethodDoc))
842 self.vpp_stderr_deque.append(
843 "--- test setUp() for %s.%s(%s) starts here ---\n" %
844 (self.__class__.__name__, self._testMethodName,
845 self._testMethodDoc))
846 self.vapi.cli("clear trace")
847 # store the test instance inside the test class - so that objects
848 # holding the class can access instance methods (like assertEqual)
849 type(self).test_instance = self
852 def pg_enable_capture(cls, interfaces=None):
854 Enable capture on packet-generator interfaces
856 :param interfaces: iterable interface indexes (if None,
857 use self.pg_interfaces)
860 if interfaces is None:
861 interfaces = cls.pg_interfaces
866 def register_pcap(cls, intf, worker):
867 """ Register a pcap in the testclass """
868 # add to the list of captures with current timestamp
869 cls._pcaps.append((intf, worker))
872 def get_vpp_time(cls):
873 # processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT"
874 # returns float("2.190522")
875 timestr = cls.vapi.cli('show clock')
876 head, sep, tail = timestr.partition(',')
877 head, sep, tail = head.partition('Time now')
881 def sleep_on_vpp_time(cls, sec):
882 """ Sleep according to time in VPP world """
883 # On a busy system with many processes
884 # we might end up with VPP time being slower than real world
885 # So take that into account when waiting for VPP to do something
886 start_time = cls.get_vpp_time()
887 while cls.get_vpp_time() - start_time < sec:
891 def pg_start(cls, trace=True):
892 """ Enable the PG, wait till it is done, then clean up """
893 for (intf, worker) in cls._old_pcaps:
894 intf.rename_old_pcap_file(intf.get_in_path(worker),
895 intf.in_history_counter)
898 cls.vapi.cli("clear trace")
899 cls.vapi.cli("trace add pg-input 1000")
900 cls.vapi.cli('packet-generator enable')
901 # PG, when starts, runs to completion -
902 # so let's avoid a race condition,
903 # and wait a little till it's done.
904 # Then clean it up - and then be gone.
905 deadline = time.time() + 300
906 while cls.vapi.cli('show packet-generator').find("Yes") != -1:
907 cls.sleep(0.01) # yield
908 if time.time() > deadline:
909 cls.logger.error("Timeout waiting for pg to stop")
911 for intf, worker in cls._pcaps:
912 cls.vapi.cli('packet-generator delete %s' %
913 intf.get_cap_name(worker))
914 cls._old_pcaps = cls._pcaps
918 def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0,
921 Create packet-generator interfaces.
923 :param interfaces: iterable indexes of the interfaces.
924 :returns: List of created interfaces.
929 intf = VppPGInterface(cls, i, gso, gso_size, mode)
930 setattr(cls, intf.name, intf)
932 cls.pg_interfaces = result
936 def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
937 pgmode = VppEnum.vl_api_pg_interface_mode_t
938 return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
939 pgmode.PG_API_MODE_IP4)
942 def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
943 pgmode = VppEnum.vl_api_pg_interface_mode_t
944 return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
945 pgmode.PG_API_MODE_IP6)
948 def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
949 pgmode = VppEnum.vl_api_pg_interface_mode_t
950 return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
951 pgmode.PG_API_MODE_ETHERNET)
954 def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
955 pgmode = VppEnum.vl_api_pg_interface_mode_t
956 return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
957 pgmode.PG_API_MODE_ETHERNET)
960 def create_loopback_interfaces(cls, count):
962 Create loopback interfaces.
964 :param count: number of interfaces created.
965 :returns: List of created interfaces.
967 result = [VppLoInterface(cls) for i in range(count)]
969 setattr(cls, intf.name, intf)
970 cls.lo_interfaces = result
974 def create_bvi_interfaces(cls, count):
976 Create BVI interfaces.
978 :param count: number of interfaces created.
979 :returns: List of created interfaces.
981 result = [VppBviInterface(cls) for i in range(count)]
983 setattr(cls, intf.name, intf)
984 cls.bvi_interfaces = result
988 def extend_packet(packet, size, padding=' '):
990 Extend packet to given size by padding with spaces or custom padding
991 NOTE: Currently works only when Raw layer is present.
993 :param packet: packet
994 :param size: target size
995 :param padding: padding used to extend the payload
998 packet_len = len(packet) + 4
999 extend = size - packet_len
1001 num = (extend // len(padding)) + 1
1002 packet[Raw].load += (padding * num)[:extend].encode("ascii")
1005 def reset_packet_infos(cls):
1006 """ Reset the list of packet info objects and packet counts to zero """
1007 cls._packet_infos = {}
1008 cls._packet_count_for_dst_if_idx = {}
1011 def create_packet_info(cls, src_if, dst_if):
1013 Create packet info object containing the source and destination indexes
1014 and add it to the testcase's packet info list
1016 :param VppInterface src_if: source interface
1017 :param VppInterface dst_if: destination interface
1019 :returns: _PacketInfo object
1022 info = _PacketInfo()
1023 info.index = len(cls._packet_infos)
1024 info.src = src_if.sw_if_index
1025 info.dst = dst_if.sw_if_index
1026 if isinstance(dst_if, VppSubInterface):
1027 dst_idx = dst_if.parent.sw_if_index
1029 dst_idx = dst_if.sw_if_index
1030 if dst_idx in cls._packet_count_for_dst_if_idx:
1031 cls._packet_count_for_dst_if_idx[dst_idx] += 1
1033 cls._packet_count_for_dst_if_idx[dst_idx] = 1
1034 cls._packet_infos[info.index] = info
1038 def info_to_payload(info):
1040 Convert _PacketInfo object to packet payload
1042 :param info: _PacketInfo object
1044 :returns: string containing serialized data from packet info
1047 # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1048 return pack('iiiih', info.index, info.src,
1049 info.dst, info.ip, info.proto)
1052 def payload_to_info(payload, payload_field='load'):
1054 Convert packet payload to _PacketInfo object
1056 :param payload: packet payload
1057 :type payload: <class 'scapy.packet.Raw'>
1058 :param payload_field: packet fieldname of payload "load" for
1059 <class 'scapy.packet.Raw'>
1060 :type payload_field: str
1061 :returns: _PacketInfo object containing de-serialized data from payload
1065 # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1066 payload_b = getattr(payload, payload_field)[:18]
1068 info = _PacketInfo()
1069 info.index, info.src, info.dst, info.ip, info.proto \
1070 = unpack('iiiih', payload_b)
1072 # some SRv6 TCs depend on get an exception if bad values are detected
1073 if info.index > 0x4000:
1074 raise ValueError('Index value is invalid')
1078 def get_next_packet_info(self, info):
1080 Iterate over the packet info list stored in the testcase
1081 Start iteration with first element if info is None
1082 Continue based on index in info if info is specified
1084 :param info: info or None
1085 :returns: next info in list or None if no more infos
1090 next_index = info.index + 1
1091 if next_index == len(self._packet_infos):
1094 return self._packet_infos[next_index]
1096 def get_next_packet_info_for_interface(self, src_index, info):
1098 Search the packet info list for the next packet info with same source
1101 :param src_index: source interface index to search for
1102 :param info: packet info - where to start the search
1103 :returns: packet info or None
1107 info = self.get_next_packet_info(info)
1110 if info.src == src_index:
1113 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
1115 Search the packet info list for the next packet info with same source
1116 and destination interface indexes
1118 :param src_index: source interface index to search for
1119 :param dst_index: destination interface index to search for
1120 :param info: packet info - where to start the search
1121 :returns: packet info or None
1125 info = self.get_next_packet_info_for_interface(src_index, info)
1128 if info.dst == dst_index:
1131 def assert_equal(self, real_value, expected_value, name_or_class=None):
1132 if name_or_class is None:
1133 self.assertEqual(real_value, expected_value)
1136 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
1137 msg = msg % (getdoc(name_or_class).strip(),
1138 real_value, str(name_or_class(real_value)),
1139 expected_value, str(name_or_class(expected_value)))
1141 msg = "Invalid %s: %s does not match expected value %s" % (
1142 name_or_class, real_value, expected_value)
1144 self.assertEqual(real_value, expected_value, msg)
1146 def assert_in_range(self,
1154 msg = "Invalid %s: %s out of range <%s,%s>" % (
1155 name, real_value, expected_min, expected_max)
1156 self.assertTrue(expected_min <= real_value <= expected_max, msg)
1158 def assert_packet_checksums_valid(self, packet,
1159 ignore_zero_udp_checksums=True):
1160 received = packet.__class__(scapy.compat.raw(packet))
1161 udp_layers = ['UDP', 'UDPerror']
1162 checksum_fields = ['cksum', 'chksum']
1165 temp = received.__class__(scapy.compat.raw(received))
1167 layer = temp.getlayer(counter)
1169 layer = layer.copy()
1170 layer.remove_payload()
1171 for cf in checksum_fields:
1172 if hasattr(layer, cf):
1173 if ignore_zero_udp_checksums and \
1174 0 == getattr(layer, cf) and \
1175 layer.name in udp_layers:
1177 delattr(temp.getlayer(counter), cf)
1178 checksums.append((counter, cf))
1181 counter = counter + 1
1182 if 0 == len(checksums):
1184 temp = temp.__class__(scapy.compat.raw(temp))
1185 for layer, cf in checksums:
1186 calc_sum = getattr(temp[layer], cf)
1188 getattr(received[layer], cf), calc_sum,
1189 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
1191 "Checksum field `%s` on `%s` layer has correct value `%s`" %
1192 (cf, temp[layer].name, calc_sum))
1194 def assert_checksum_valid(self, received_packet, layer,
1195 field_name='chksum',
1196 ignore_zero_checksum=False):
1197 """ Check checksum of received packet on given layer """
1198 received_packet_checksum = getattr(received_packet[layer], field_name)
1199 if ignore_zero_checksum and 0 == received_packet_checksum:
1201 recalculated = received_packet.__class__(
1202 scapy.compat.raw(received_packet))
1203 delattr(recalculated[layer], field_name)
1204 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1205 self.assert_equal(received_packet_checksum,
1206 getattr(recalculated[layer], field_name),
1207 "packet checksum on layer: %s" % layer)
1209 def assert_ip_checksum_valid(self, received_packet,
1210 ignore_zero_checksum=False):
1211 self.assert_checksum_valid(received_packet, 'IP',
1212 ignore_zero_checksum=ignore_zero_checksum)
1214 def assert_tcp_checksum_valid(self, received_packet,
1215 ignore_zero_checksum=False):
1216 self.assert_checksum_valid(received_packet, 'TCP',
1217 ignore_zero_checksum=ignore_zero_checksum)
1219 def assert_udp_checksum_valid(self, received_packet,
1220 ignore_zero_checksum=True):
1221 self.assert_checksum_valid(received_packet, 'UDP',
1222 ignore_zero_checksum=ignore_zero_checksum)
1224 def assert_embedded_icmp_checksum_valid(self, received_packet):
1225 if received_packet.haslayer(IPerror):
1226 self.assert_checksum_valid(received_packet, 'IPerror')
1227 if received_packet.haslayer(TCPerror):
1228 self.assert_checksum_valid(received_packet, 'TCPerror')
1229 if received_packet.haslayer(UDPerror):
1230 self.assert_checksum_valid(received_packet, 'UDPerror',
1231 ignore_zero_checksum=True)
1232 if received_packet.haslayer(ICMPerror):
1233 self.assert_checksum_valid(received_packet, 'ICMPerror')
1235 def assert_icmp_checksum_valid(self, received_packet):
1236 self.assert_checksum_valid(received_packet, 'ICMP')
1237 self.assert_embedded_icmp_checksum_valid(received_packet)
1239 def assert_icmpv6_checksum_valid(self, pkt):
1240 if pkt.haslayer(ICMPv6DestUnreach):
1241 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
1242 self.assert_embedded_icmp_checksum_valid(pkt)
1243 if pkt.haslayer(ICMPv6EchoRequest):
1244 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
1245 if pkt.haslayer(ICMPv6EchoReply):
1246 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
1248 def get_packet_counter(self, counter):
1249 if counter.startswith("/"):
1250 counter_value = self.statistics.get_counter(counter)
1252 counters = self.vapi.cli("sh errors").split('\n')
1254 for i in range(1, len(counters) - 1):
1255 results = counters[i].split()
1256 if results[1] == counter:
1257 counter_value = int(results[0])
1259 return counter_value
1261 def assert_packet_counter_equal(self, counter, expected_value):
1262 counter_value = self.get_packet_counter(counter)
1263 self.assert_equal(counter_value, expected_value,
1264 "packet counter `%s'" % counter)
1266 def assert_error_counter_equal(self, counter, expected_value):
1267 counter_value = self.statistics[counter].sum()
1268 self.assert_equal(counter_value, expected_value,
1269 "error counter `%s'" % counter)
1272 def sleep(cls, timeout, remark=None):
1274 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1275 # * by Guido, only the main thread can be interrupted.
1277 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1280 if hasattr(os, 'sched_yield'):
1286 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1287 before = time.time()
1290 if after - before > 2 * timeout:
1291 cls.logger.error("unexpected self.sleep() result - "
1292 "slept for %es instead of ~%es!",
1293 after - before, timeout)
1296 "Finished sleep (%s) - slept %es (wanted %es)",
1297 remark, after - before, timeout)
1299 def pg_send(self, intf, pkts, worker=None, trace=True):
1300 intf.add_stream(pkts, worker=worker)
1301 self.pg_enable_capture(self.pg_interfaces)
1302 self.pg_start(trace=trace)
1304 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
1305 self.pg_send(intf, pkts)
1308 for i in self.pg_interfaces:
1309 i.get_capture(0, timeout=timeout)
1310 i.assert_nothing_captured(remark=remark)
1313 def send_and_expect(self, intf, pkts, output, n_rx=None, worker=None,
1317 self.pg_send(intf, pkts, worker=worker, trace=trace)
1318 rx = output.get_capture(n_rx)
1321 def send_and_expect_only(self, intf, pkts, output, timeout=None):
1322 self.pg_send(intf, pkts)
1323 rx = output.get_capture(len(pkts))
1327 for i in self.pg_interfaces:
1328 if i not in outputs:
1329 i.get_capture(0, timeout=timeout)
1330 i.assert_nothing_captured()
1336 def get_testcase_doc_name(test):
1337 return getdoc(test.__class__).splitlines()[0]
1340 def get_test_description(descriptions, test):
1341 short_description = test.shortDescription()
1342 if descriptions and short_description:
1343 return short_description
1348 class TestCaseInfo(object):
1349 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1350 self.logger = logger
1351 self.tempdir = tempdir
1352 self.vpp_pid = vpp_pid
1353 self.vpp_bin_path = vpp_bin_path
1354 self.core_crash_test = None
1357 class VppTestResult(unittest.TestResult):
1359 @property result_string
1360 String variable to store the test case result string.
1362 List variable containing 2-tuples of TestCase instances and strings
1363 holding formatted tracebacks. Each tuple represents a test which
1364 raised an unexpected exception.
1366 List variable containing 2-tuples of TestCase instances and strings
1367 holding formatted tracebacks. Each tuple represents a test where
1368 a failure was explicitly signalled using the TestCase.assert*()
1372 failed_test_cases_info = set()
1373 core_crash_test_cases_info = set()
1374 current_test_case_info = None
1376 def __init__(self, stream=None, descriptions=None, verbosity=None,
1379 :param stream File descriptor to store where to report test results.
1380 Set to the standard error stream by default.
1381 :param descriptions Boolean variable to store information if to use
1382 test case descriptions.
1383 :param verbosity Integer variable to store required verbosity level.
1385 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1386 self.stream = stream
1387 self.descriptions = descriptions
1388 self.verbosity = verbosity
1389 self.result_string = None
1390 self.runner = runner
1393 def addSuccess(self, test):
1395 Record a test succeeded result
1400 if self.current_test_case_info:
1401 self.current_test_case_info.logger.debug(
1402 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1403 test._testMethodName,
1404 test._testMethodDoc))
1405 unittest.TestResult.addSuccess(self, test)
1406 self.result_string = colorize("OK", GREEN)
1408 self.send_result_through_pipe(test, PASS)
1410 def addSkip(self, test, reason):
1412 Record a test skipped.
1418 if self.current_test_case_info:
1419 self.current_test_case_info.logger.debug(
1420 "--- addSkip() %s.%s(%s) called, reason is %s" %
1421 (test.__class__.__name__, test._testMethodName,
1422 test._testMethodDoc, reason))
1423 unittest.TestResult.addSkip(self, test, reason)
1424 self.result_string = colorize("SKIP", YELLOW)
1426 if reason == "not enough cpus":
1427 self.send_result_through_pipe(test, SKIP_CPU_SHORTAGE)
1429 self.send_result_through_pipe(test, SKIP)
1431 def symlink_failed(self):
1432 if self.current_test_case_info:
1434 failed_dir = os.getenv('FAILED_DIR')
1435 link_path = os.path.join(
1438 os.path.basename(self.current_test_case_info.tempdir))
1440 self.current_test_case_info.logger.debug(
1441 "creating a link to the failed test")
1442 self.current_test_case_info.logger.debug(
1443 "os.symlink(%s, %s)" %
1444 (self.current_test_case_info.tempdir, link_path))
1445 if os.path.exists(link_path):
1446 self.current_test_case_info.logger.debug(
1447 'symlink already exists')
1449 os.symlink(self.current_test_case_info.tempdir, link_path)
1451 except Exception as e:
1452 self.current_test_case_info.logger.error(e)
1454 def send_result_through_pipe(self, test, result):
1455 if hasattr(self, 'test_framework_result_pipe'):
1456 pipe = self.test_framework_result_pipe
1458 pipe.send((test.id(), result))
1460 def log_error(self, test, err, fn_name):
1461 if self.current_test_case_info:
1462 if isinstance(test, unittest.suite._ErrorHolder):
1463 test_name = test.description
1465 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1466 test._testMethodName,
1467 test._testMethodDoc)
1468 self.current_test_case_info.logger.debug(
1469 "--- %s() %s called, err is %s" %
1470 (fn_name, test_name, err))
1471 self.current_test_case_info.logger.debug(
1472 "formatted exception is:\n%s" %
1473 "".join(format_exception(*err)))
1475 def add_error(self, test, err, unittest_fn, error_type):
1476 if error_type == FAIL:
1477 self.log_error(test, err, 'addFailure')
1478 error_type_str = colorize("FAIL", RED)
1479 elif error_type == ERROR:
1480 self.log_error(test, err, 'addError')
1481 error_type_str = colorize("ERROR", RED)
1483 raise Exception('Error type %s cannot be used to record an '
1484 'error or a failure' % error_type)
1486 unittest_fn(self, test, err)
1487 if self.current_test_case_info:
1488 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1490 self.current_test_case_info.tempdir)
1491 self.symlink_failed()
1492 self.failed_test_cases_info.add(self.current_test_case_info)
1493 if is_core_present(self.current_test_case_info.tempdir):
1494 if not self.current_test_case_info.core_crash_test:
1495 if isinstance(test, unittest.suite._ErrorHolder):
1496 test_name = str(test)
1498 test_name = "'{!s}' ({!s})".format(
1499 get_testcase_doc_name(test), test.id())
1500 self.current_test_case_info.core_crash_test = test_name
1501 self.core_crash_test_cases_info.add(
1502 self.current_test_case_info)
1504 self.result_string = '%s [no temp dir]' % error_type_str
1506 self.send_result_through_pipe(test, error_type)
1508 def addFailure(self, test, err):
1510 Record a test failed result
1513 :param err: error message
1516 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1518 def addError(self, test, err):
1520 Record a test error result
1523 :param err: error message
1526 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1528 def getDescription(self, test):
1530 Get test description
1533 :returns: test description
1536 return get_test_description(self.descriptions, test)
1538 def startTest(self, test):
1546 def print_header(test):
1547 if test.__class__ in self.printed:
1550 test_doc = getdoc(test)
1552 raise Exception("No doc string for test '%s'" % test.id())
1554 test_title = test_doc.splitlines()[0].rstrip()
1555 test_title = colorize(test_title, GREEN)
1556 if test.is_tagged_run_solo():
1557 test_title = colorize(f"SOLO RUN: {test_title}", YELLOW)
1559 # This block may overwrite the colorized title above,
1560 # but we want this to stand out and be fixed
1561 if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
1562 test_title = colorize(
1563 f"FIXME with VPP workers: {test_title}", RED)
1565 if hasattr(test, 'vpp_worker_count'):
1566 if test.vpp_worker_count == 0:
1567 test_title += " [main thread only]"
1568 elif test.vpp_worker_count == 1:
1569 test_title += " [1 worker thread]"
1571 test_title += f" [{test.vpp_worker_count} worker threads]"
1573 if test.__class__.skipped_due_to_cpu_lack:
1574 test_title = colorize(
1575 f"{test_title} [skipped - not enough cpus, "
1576 f"required={test.__class__.get_cpus_required()}, "
1577 f"available={max_vpp_cpus}]", YELLOW)
1579 print(double_line_delim)
1581 print(double_line_delim)
1582 self.printed.append(test.__class__)
1585 self.start_test = time.time()
1586 unittest.TestResult.startTest(self, test)
1587 if self.verbosity > 0:
1588 self.stream.writeln(
1589 "Starting " + self.getDescription(test) + " ...")
1590 self.stream.writeln(single_line_delim)
1592 def stopTest(self, test):
1594 Called when the given test has been run
1599 unittest.TestResult.stopTest(self, test)
1601 if self.verbosity > 0:
1602 self.stream.writeln(single_line_delim)
1603 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1604 self.result_string))
1605 self.stream.writeln(single_line_delim)
1607 self.stream.writeln("%-68s %4.2f %s" %
1608 (self.getDescription(test),
1609 time.time() - self.start_test,
1610 self.result_string))
1612 self.send_result_through_pipe(test, TEST_RUN)
1614 def printErrors(self):
1616 Print errors from running the test case
1618 if len(self.errors) > 0 or len(self.failures) > 0:
1619 self.stream.writeln()
1620 self.printErrorList('ERROR', self.errors)
1621 self.printErrorList('FAIL', self.failures)
1623 # ^^ that is the last output from unittest before summary
1624 if not self.runner.print_summary:
1625 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1626 self.stream = devnull
1627 self.runner.stream = devnull
1629 def printErrorList(self, flavour, errors):
1631 Print error list to the output stream together with error type
1632 and test case description.
1634 :param flavour: error type
1635 :param errors: iterable errors
1638 for test, err in errors:
1639 self.stream.writeln(double_line_delim)
1640 self.stream.writeln("%s: %s" %
1641 (flavour, self.getDescription(test)))
1642 self.stream.writeln(single_line_delim)
1643 self.stream.writeln("%s" % err)
1646 class VppTestRunner(unittest.TextTestRunner):
1648 A basic test runner implementation which prints results to standard error.
1652 def resultclass(self):
1653 """Class maintaining the results of the tests"""
1654 return VppTestResult
1656 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1657 result_pipe=None, failfast=False, buffer=False,
1658 resultclass=None, print_summary=True, **kwargs):
1659 # ignore stream setting here, use hard-coded stdout to be in sync
1660 # with prints from VppTestCase methods ...
1661 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1662 verbosity, failfast, buffer,
1663 resultclass, **kwargs)
1664 KeepAliveReporter.pipe = keep_alive_pipe
1666 self.orig_stream = self.stream
1667 self.resultclass.test_framework_result_pipe = result_pipe
1669 self.print_summary = print_summary
1671 def _makeResult(self):
1672 return self.resultclass(self.stream,
1677 def run(self, test):
1684 faulthandler.enable() # emit stack trace to stderr if killed by signal
1686 result = super(VppTestRunner, self).run(test)
1687 if not self.print_summary:
1688 self.stream = self.orig_stream
1689 result.stream = self.orig_stream
1693 class Worker(Thread):
1694 def __init__(self, executable_args, logger, env=None, *args, **kwargs):
1695 super(Worker, self).__init__(*args, **kwargs)
1696 self.logger = logger
1697 self.args = executable_args
1698 if hasattr(self, 'testcase') and self.testcase.debug_all:
1699 if self.testcase.debug_gdbserver:
1700 self.args = ['/usr/bin/gdbserver', 'localhost:{port}'
1701 .format(port=self.testcase.gdbserver_port)] + args
1702 elif self.testcase.debug_gdb and hasattr(self, 'wait_for_gdb'):
1703 self.args.append(self.wait_for_gdb)
1704 self.app_bin = executable_args[0]
1705 self.app_name = os.path.basename(self.app_bin)
1706 if hasattr(self, 'role'):
1707 self.app_name += ' {role}'.format(role=self.role)
1710 env = {} if env is None else env
1711 self.env = copy.deepcopy(env)
1713 def wait_for_enter(self):
1714 if not hasattr(self, 'testcase'):
1716 if self.testcase.debug_all and self.testcase.debug_gdbserver:
1718 print(double_line_delim)
1719 print("Spawned GDB Server for '{app}' with PID: {pid}"
1720 .format(app=self.app_name, pid=self.process.pid))
1721 elif self.testcase.debug_all and self.testcase.debug_gdb:
1723 print(double_line_delim)
1724 print("Spawned '{app}' with PID: {pid}"
1725 .format(app=self.app_name, pid=self.process.pid))
1728 print(single_line_delim)
1729 print("You can debug '{app}' using:".format(app=self.app_name))
1730 if self.testcase.debug_gdbserver:
1731 print("sudo gdb " + self.app_bin +
1732 " -ex 'target remote localhost:{port}'"
1733 .format(port=self.testcase.gdbserver_port))
1734 print("Now is the time to attach gdb by running the above "
1735 "command, set up breakpoints etc., then resume from "
1736 "within gdb by issuing the 'continue' command")
1737 self.testcase.gdbserver_port += 1
1738 elif self.testcase.debug_gdb:
1739 print("sudo gdb " + self.app_bin +
1740 " -ex 'attach {pid}'".format(pid=self.process.pid))
1741 print("Now is the time to attach gdb by running the above "
1742 "command and set up breakpoints etc., then resume from"
1743 " within gdb by issuing the 'continue' command")
1744 print(single_line_delim)
1745 input("Press ENTER to continue running the testcase...")
1748 executable = self.args[0]
1749 if not os.path.exists(executable) or not os.access(
1750 executable, os.F_OK | os.X_OK):
1751 # Exit code that means some system file did not exist,
1752 # could not be opened, or had some other kind of error.
1753 self.result = os.EX_OSFILE
1754 raise EnvironmentError(
1755 "executable '%s' is not found or executable." % executable)
1756 self.logger.debug("Running executable '{app}': '{cmd}'"
1757 .format(app=self.app_name,
1758 cmd=' '.join(self.args)))
1759 env = os.environ.copy()
1760 env.update(self.env)
1761 env["CK_LOG_FILE_NAME"] = "-"
1762 self.process = subprocess.Popen(
1763 ['stdbuf', '-o0', '-e0'] + self.args, shell=False, env=env,
1764 preexec_fn=os.setpgrp, stdout=subprocess.PIPE,
1765 stderr=subprocess.PIPE)
1766 self.wait_for_enter()
1767 out, err = self.process.communicate()
1768 self.logger.debug("Finished running `{app}'".format(app=self.app_name))
1769 self.logger.info("Return code is `%s'" % self.process.returncode)
1770 self.logger.info(single_line_delim)
1771 self.logger.info("Executable `{app}' wrote to stdout:"
1772 .format(app=self.app_name))
1773 self.logger.info(single_line_delim)
1774 self.logger.info(out.decode('utf-8'))
1775 self.logger.info(single_line_delim)
1776 self.logger.info("Executable `{app}' wrote to stderr:"
1777 .format(app=self.app_name))
1778 self.logger.info(single_line_delim)
1779 self.logger.info(err.decode('utf-8'))
1780 self.logger.info(single_line_delim)
1781 self.result = self.process.returncode
1784 if __name__ == '__main__':