3 from __future__ import print_function
19 from collections import deque
20 from threading import Thread, Event
21 from inspect import getdoc, isclass
22 from traceback import format_exception
23 from logging import FileHandler, DEBUG, Formatter
25 from abc import ABC, abstractmethod
28 from scapy.packet import Raw
29 import hook as hookmodule
30 from vpp_pg_interface import VppPGInterface
31 from vpp_sub_interface import VppSubInterface
32 from vpp_lo_interface import VppLoInterface
33 from vpp_bvi_interface import VppBviInterface
34 from vpp_papi_provider import VppPapiProvider
35 from vpp_papi import VppEnum
37 from vpp_papi.vpp_stats import VPPStats
38 from vpp_papi.vpp_transport_socket import VppTransportSocketIOError
39 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
41 from vpp_object import VppObjectRegistry
42 from util import ppp, is_core_present
43 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
44 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
45 from scapy.layers.inet6 import ICMPv6EchoReply
47 from cpu_config import available_cpus, num_cpus, max_vpp_cpus
49 logger = logging.getLogger(__name__)
51 # Set up an empty logger for the testcase that can be overridden as necessary
52 null_logger = logging.getLogger('VppTestCase')
53 null_logger.addHandler(logging.NullHandler())
63 class BoolEnvironmentVariable(object):
65 def __init__(self, env_var_name, default='n', true_values=None):
66 self.name = env_var_name
67 self.default = default
68 self.true_values = true_values if true_values is not None else \
72 return os.getenv(self.name, self.default).lower() in self.true_values
74 if sys.version_info[0] == 2:
75 __nonzero__ = __bool__
78 return 'BoolEnvironmentVariable(%r, default=%r, true_values=%r)' % \
79 (self.name, self.default, self.true_values)
82 debug_framework = BoolEnvironmentVariable('TEST_DEBUG')
87 Test framework module.
89 The module provides a set of tools for constructing and running tests and
90 representing the results.
94 class VppDiedError(Exception):
95 """ exception for reporting that the subprocess has died."""
97 signals_by_value = {v: k for k, v in signal.__dict__.items() if
98 k.startswith('SIG') and not k.startswith('SIG_')}
100 def __init__(self, rv=None, testcase=None, method_name=None):
102 self.signal_name = None
103 self.testcase = testcase
104 self.method_name = method_name
107 self.signal_name = VppDiedError.signals_by_value[-rv]
108 except (KeyError, TypeError):
111 if testcase is None and method_name is None:
114 in_msg = ' while running %s.%s' % (testcase, method_name)
117 msg = "VPP subprocess died unexpectedly%s with return code: %d%s."\
118 % (in_msg, self.rv, ' [%s]' %
120 self.signal_name is not None else ''))
122 msg = "VPP subprocess died unexpectedly%s." % in_msg
124 super(VppDiedError, self).__init__(msg)
127 class _PacketInfo(object):
128 """Private class to create packet info object.
130 Help process information about the next packet.
131 Set variables to default values.
133 #: Store the index of the packet.
135 #: Store the index of the source packet generator interface of the packet.
137 #: Store the index of the destination packet generator interface
140 #: Store expected ip version
142 #: Store expected upper protocol
144 #: Store the copy of the former packet.
147 def __eq__(self, other):
148 index = self.index == other.index
149 src = self.src == other.src
150 dst = self.dst == other.dst
151 data = self.data == other.data
152 return index and src and dst and data
155 def pump_output(testclass):
156 """ pump output from vpp stdout/stderr to proper queues """
159 while not testclass.pump_thread_stop_flag.is_set():
160 readable = select.select([testclass.vpp.stdout.fileno(),
161 testclass.vpp.stderr.fileno(),
162 testclass.pump_thread_wakeup_pipe[0]],
164 if testclass.vpp.stdout.fileno() in readable:
165 read = os.read(testclass.vpp.stdout.fileno(), 102400)
167 split = read.decode('ascii',
168 errors='backslashreplace').splitlines(True)
169 if len(stdout_fragment) > 0:
170 split[0] = "%s%s" % (stdout_fragment, split[0])
171 if len(split) > 0 and split[-1].endswith("\n"):
175 stdout_fragment = split[-1]
176 testclass.vpp_stdout_deque.extend(split[:limit])
177 if not testclass.cache_vpp_output:
178 for line in split[:limit]:
179 testclass.logger.info(
180 "VPP STDOUT: %s" % line.rstrip("\n"))
181 if testclass.vpp.stderr.fileno() in readable:
182 read = os.read(testclass.vpp.stderr.fileno(), 102400)
184 split = read.decode('ascii',
185 errors='backslashreplace').splitlines(True)
186 if len(stderr_fragment) > 0:
187 split[0] = "%s%s" % (stderr_fragment, split[0])
188 if len(split) > 0 and split[-1].endswith("\n"):
192 stderr_fragment = split[-1]
194 testclass.vpp_stderr_deque.extend(split[:limit])
195 if not testclass.cache_vpp_output:
196 for line in split[:limit]:
197 testclass.logger.error(
198 "VPP STDERR: %s" % line.rstrip("\n"))
199 # ignoring the dummy pipe here intentionally - the
200 # flag will take care of properly terminating the loop
203 def _is_skip_aarch64_set():
204 return BoolEnvironmentVariable('SKIP_AARCH64')
207 is_skip_aarch64_set = _is_skip_aarch64_set()
210 def _is_platform_aarch64():
211 return platform.machine() == 'aarch64'
214 is_platform_aarch64 = _is_platform_aarch64()
217 def _running_extended_tests():
218 return BoolEnvironmentVariable("EXTENDED_TESTS")
221 running_extended_tests = _running_extended_tests()
224 def _running_gcov_tests():
225 return BoolEnvironmentVariable("GCOV_TESTS")
228 running_gcov_tests = _running_gcov_tests()
231 def get_environ_vpp_worker_count():
232 worker_config = os.getenv("VPP_WORKER_CONFIG", None)
234 elems = worker_config.split(" ")
235 if elems[0] != "workers" or len(elems) != 2:
236 raise ValueError("Wrong VPP_WORKER_CONFIG == '%s' value." %
243 environ_vpp_worker_count = get_environ_vpp_worker_count()
246 class KeepAliveReporter(object):
248 Singleton object which reports test start to parent process
253 self.__dict__ = self._shared_state
261 def pipe(self, pipe):
262 if self._pipe is not None:
263 raise Exception("Internal error - pipe should only be set once.")
266 def send_keep_alive(self, test, desc=None):
268 Write current test tmpdir & desc to keep-alive pipe to signal liveness
270 if self.pipe is None:
271 # if not running forked..
275 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
279 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
282 class TestCaseTag(Enum):
283 # marks the suites that must run at the end
284 # using only a single test runner
286 # marks the suites broken on VPP multi-worker
287 FIXME_VPP_WORKERS = 2
290 def create_tag_decorator(e):
293 cls.test_tags.append(e)
294 except AttributeError:
300 tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
301 tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
315 class CPUInterface(ABC):
317 skipped_due_to_cpu_lack = False
321 def get_cpus_required(cls):
325 def assign_cpus(cls, cpus):
329 class VppTestCase(CPUInterface, unittest.TestCase):
330 """This subclass is a base class for VPP test cases that are implemented as
331 classes. It provides methods to create and run test case.
334 extra_vpp_statseg_config = ""
335 extra_vpp_punt_config = []
336 extra_vpp_plugin_config = []
338 vapi_response_timeout = 5
341 def packet_infos(self):
342 """List of packet infos"""
343 return self._packet_infos
346 def get_packet_count_for_if_idx(cls, dst_if_index):
347 """Get the number of packet info for specified destination if index"""
348 if dst_if_index in cls._packet_count_for_dst_if_idx:
349 return cls._packet_count_for_dst_if_idx[dst_if_index]
354 def has_tag(cls, tag):
355 """ if the test case has a given tag - return true """
357 return tag in cls.test_tags
358 except AttributeError:
363 def is_tagged_run_solo(cls):
364 """ if the test case class is timing-sensitive - return true """
365 return cls.has_tag(TestCaseTag.RUN_SOLO)
369 """Return the instance of this testcase"""
370 return cls.test_instance
373 def set_debug_flags(cls, d):
374 cls.gdbserver_port = 7777
375 cls.debug_core = False
376 cls.debug_gdb = False
377 cls.debug_gdbserver = False
378 cls.debug_all = False
379 cls.debug_attach = False
384 cls.debug_core = True
385 elif dl == "gdb" or dl == "gdb-all":
387 elif dl == "gdbserver" or dl == "gdbserver-all":
388 cls.debug_gdbserver = True
390 cls.debug_attach = True
392 raise Exception("Unrecognized DEBUG option: '%s'" % d)
393 if dl == "gdb-all" or dl == "gdbserver-all":
397 def get_vpp_worker_count(cls):
398 if not hasattr(cls, "vpp_worker_count"):
399 if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
400 cls.vpp_worker_count = 0
402 cls.vpp_worker_count = environ_vpp_worker_count
403 return cls.vpp_worker_count
406 def get_cpus_required(cls):
407 return 1 + cls.get_vpp_worker_count()
410 def setUpConstants(cls):
411 """ Set-up the test case class based on environment variables """
412 cls.step = BoolEnvironmentVariable('STEP')
413 # inverted case to handle '' == True
414 c = os.getenv("CACHE_OUTPUT", "1")
415 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
416 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
417 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
418 cls.test_plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
419 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
421 if cls.plugin_path is not None:
422 if cls.extern_plugin_path is not None:
423 plugin_path = "%s:%s" % (
424 cls.plugin_path, cls.extern_plugin_path)
426 plugin_path = cls.plugin_path
427 elif cls.extern_plugin_path is not None:
428 plugin_path = cls.extern_plugin_path
430 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
431 debug_cli = "cli-listen localhost:5002"
433 size = os.getenv("COREDUMP_SIZE")
435 coredump_size = "coredump-size %s" % size
436 if coredump_size is None:
437 coredump_size = "coredump-size unlimited"
439 default_variant = os.getenv("VARIANT")
440 if default_variant is not None:
441 default_variant = "defaults { %s 100 }" % default_variant
445 api_fuzzing = os.getenv("API_FUZZ")
446 if api_fuzzing is None:
451 "unix", "{", "nodaemon", debug_cli, "full-coredump",
452 coredump_size, "runtime-dir", cls.tempdir, "}",
453 "api-trace", "{", "on", "}",
454 "api-segment", "{", "prefix", cls.get_api_segment_prefix(), "}",
455 "cpu", "{", "main-core", str(cls.cpus[0]), ]
456 if cls.get_vpp_worker_count():
457 cls.vpp_cmdline.extend([
458 "corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])])
459 cls.vpp_cmdline.extend([
461 "physmem", "{", "max-size", "32m", "}",
462 "statseg", "{", "socket-name", cls.get_stats_sock_path(),
463 cls.extra_vpp_statseg_config, "}",
464 "socksvr", "{", "socket-name", cls.get_api_sock_path(), "}",
465 "node { ", default_variant, "}",
466 "api-fuzz {", api_fuzzing, "}",
467 "plugins", "{", "plugin", "dpdk_plugin.so", "{", "disable", "}",
468 "plugin", "rdma_plugin.so", "{", "disable", "}",
469 "plugin", "lisp_unittest_plugin.so", "{", "enable", "}",
470 "plugin", "unittest_plugin.so", "{", "enable", "}"
471 ] + cls.extra_vpp_plugin_config + ["}", ])
473 if cls.extra_vpp_punt_config is not None:
474 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
475 if plugin_path is not None:
476 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
477 if cls.test_plugin_path is not None:
478 cls.vpp_cmdline.extend(["test_plugin_path", cls.test_plugin_path])
480 if not cls.debug_attach:
481 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
482 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
485 def wait_for_enter(cls):
486 if cls.debug_gdbserver:
487 print(double_line_delim)
488 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
490 print(double_line_delim)
491 print("Spawned VPP with PID: %d" % cls.vpp.pid)
493 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
495 print(single_line_delim)
496 print("You can debug VPP using:")
497 if cls.debug_gdbserver:
498 print("sudo gdb " + cls.vpp_bin +
499 " -ex 'target remote localhost:{port}'"
500 .format(port=cls.gdbserver_port))
501 print("Now is the time to attach gdb by running the above "
502 "command, set up breakpoints etc., then resume VPP from "
503 "within gdb by issuing the 'continue' command")
504 cls.gdbserver_port += 1
506 print("sudo gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
507 print("Now is the time to attach gdb by running the above "
508 "command and set up breakpoints etc., then resume VPP from"
509 " within gdb by issuing the 'continue' command")
510 print(single_line_delim)
511 input("Press ENTER to continue running the testcase...")
519 cls.logger.debug(f"Assigned cpus: {cls.cpus}")
520 cmdline = cls.vpp_cmdline
522 if cls.debug_gdbserver:
523 gdbserver = '/usr/bin/gdbserver'
524 if not os.path.isfile(gdbserver) or\
525 not os.access(gdbserver, os.X_OK):
526 raise Exception("gdbserver binary '%s' does not exist or is "
527 "not executable" % gdbserver)
529 cmdline = [gdbserver, 'localhost:{port}'
530 .format(port=cls.gdbserver_port)] + cls.vpp_cmdline
531 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
534 cls.vpp = subprocess.Popen(cmdline,
535 stdout=subprocess.PIPE,
536 stderr=subprocess.PIPE)
537 except subprocess.CalledProcessError as e:
538 cls.logger.critical("Subprocess returned with non-0 return code: ("
542 cls.logger.critical("Subprocess returned with OS error: "
543 "(%s) %s", e.errno, e.strerror)
545 except Exception as e:
546 cls.logger.exception("Subprocess returned unexpected from "
553 def wait_for_coredump(cls):
554 corefile = cls.tempdir + "/core"
555 if os.path.isfile(corefile):
556 cls.logger.error("Waiting for coredump to complete: %s", corefile)
557 curr_size = os.path.getsize(corefile)
558 deadline = time.time() + 60
560 while time.time() < deadline:
563 curr_size = os.path.getsize(corefile)
564 if size == curr_size:
568 cls.logger.error("Timed out waiting for coredump to complete:"
571 cls.logger.error("Coredump complete: %s, size %d",
575 def get_stats_sock_path(cls):
576 return "%s/stats.sock" % cls.tempdir
579 def get_api_sock_path(cls):
580 return "%s/api.sock" % cls.tempdir
583 def get_api_segment_prefix(cls):
584 return os.path.basename(cls.tempdir) # Only used for VAPI
587 def get_tempdir(cls):
589 return os.getenv("VPP_IN_GDB_TMP_DIR",
590 "/tmp/unittest-attach-gdb")
592 return tempfile.mkdtemp(prefix='vpp-unittest-%s-' % cls.__name__)
597 Perform class setup before running the testcase
598 Remove shared memory files, start vpp and connect the vpp-api
600 super(VppTestCase, cls).setUpClass()
601 cls.logger = get_logger(cls.__name__)
602 seed = os.environ["RND_SEED"]
604 if hasattr(cls, 'parallel_handler'):
605 cls.logger.addHandler(cls.parallel_handler)
606 cls.logger.propagate = False
607 d = os.getenv("DEBUG", None)
608 cls.set_debug_flags(d)
609 cls.tempdir = cls.get_tempdir()
610 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
611 cls.file_handler.setFormatter(
612 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
614 cls.file_handler.setLevel(DEBUG)
615 cls.logger.addHandler(cls.file_handler)
616 cls.logger.debug("--- setUpClass() for %s called ---" % cls.__name__)
617 os.chdir(cls.tempdir)
618 cls.logger.info("Temporary dir is %s, api socket is %s",
619 cls.tempdir, cls.get_api_sock_path())
620 cls.logger.debug("Random seed is %s", seed)
622 cls.reset_packet_infos()
627 cls.registry = VppObjectRegistry()
628 cls.vpp_startup_failed = False
629 cls.reporter = KeepAliveReporter()
630 # need to catch exceptions here because if we raise, then the cleanup
631 # doesn't get called and we might end with a zombie vpp
637 cls.reporter.send_keep_alive(cls, 'setUpClass')
638 VppTestResult.current_test_case_info = TestCaseInfo(
639 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
640 cls.vpp_stdout_deque = deque()
641 cls.vpp_stderr_deque = deque()
642 if not cls.debug_attach:
643 cls.pump_thread_stop_flag = Event()
644 cls.pump_thread_wakeup_pipe = os.pipe()
645 cls.pump_thread = Thread(target=pump_output, args=(cls,))
646 cls.pump_thread.daemon = True
647 cls.pump_thread.start()
648 if cls.debug_gdb or cls.debug_gdbserver or cls.debug_attach:
649 cls.vapi_response_timeout = 0
650 cls.vapi = VppPapiProvider(cls.__name__, cls,
651 cls.vapi_response_timeout)
653 hook = hookmodule.StepHook(cls)
655 hook = hookmodule.PollHook(cls)
656 cls.vapi.register_hook(hook)
657 cls.statistics = VPPStats(socketname=cls.get_stats_sock_path())
661 cls.vpp_startup_failed = True
663 "VPP died shortly after startup, check the"
664 " output to standard error for possible cause")
668 except (vpp_papi.VPPIOError, Exception) as e:
669 cls.logger.debug("Exception connecting to vapi: %s" % e)
670 cls.vapi.disconnect()
672 if cls.debug_gdbserver:
673 print(colorize("You're running VPP inside gdbserver but "
674 "VPP-API connection failed, did you forget "
675 "to 'continue' VPP from within gdb?", RED))
678 last_line = cls.vapi.cli("show thread").split("\n")[-2]
679 cls.vpp_worker_count = int(last_line.split(" ")[0])
680 print("Detected VPP with %s workers." % cls.vpp_worker_count)
681 except vpp_papi.VPPRuntimeError as e:
682 cls.logger.debug("%s" % e)
685 except Exception as e:
686 cls.logger.debug("Exception connecting to VPP: %s" % e)
691 def _debug_quit(cls):
692 if (cls.debug_gdbserver or cls.debug_gdb):
696 if cls.vpp.returncode is None:
698 print(double_line_delim)
699 print("VPP or GDB server is still running")
700 print(single_line_delim)
701 input("When done debugging, press ENTER to kill the "
702 "process and finish running the testcase...")
703 except AttributeError:
709 Disconnect vpp-api, kill vpp and cleanup shared memory files
713 # first signal that we want to stop the pump thread, then wake it up
714 if hasattr(cls, 'pump_thread_stop_flag'):
715 cls.pump_thread_stop_flag.set()
716 if hasattr(cls, 'pump_thread_wakeup_pipe'):
717 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
718 if hasattr(cls, 'pump_thread'):
719 cls.logger.debug("Waiting for pump thread to stop")
720 cls.pump_thread.join()
721 if hasattr(cls, 'vpp_stderr_reader_thread'):
722 cls.logger.debug("Waiting for stderr pump to stop")
723 cls.vpp_stderr_reader_thread.join()
725 if hasattr(cls, 'vpp'):
726 if hasattr(cls, 'vapi'):
727 cls.logger.debug(cls.vapi.vpp.get_stats())
728 cls.logger.debug("Disconnecting class vapi client on %s",
730 cls.vapi.disconnect()
731 cls.logger.debug("Deleting class vapi attribute on %s",
735 if not cls.debug_attach and cls.vpp.returncode is None:
736 cls.wait_for_coredump()
737 cls.logger.debug("Sending TERM to vpp")
739 cls.logger.debug("Waiting for vpp to die")
741 outs, errs = cls.vpp.communicate(timeout=5)
742 except subprocess.TimeoutExpired:
744 outs, errs = cls.vpp.communicate()
745 cls.logger.debug("Deleting class vpp attribute on %s",
747 if not cls.debug_attach:
748 cls.vpp.stdout.close()
749 cls.vpp.stderr.close()
752 if cls.vpp_startup_failed:
753 stdout_log = cls.logger.info
754 stderr_log = cls.logger.critical
756 stdout_log = cls.logger.info
757 stderr_log = cls.logger.info
759 if hasattr(cls, 'vpp_stdout_deque'):
760 stdout_log(single_line_delim)
761 stdout_log('VPP output to stdout while running %s:', cls.__name__)
762 stdout_log(single_line_delim)
763 vpp_output = "".join(cls.vpp_stdout_deque)
764 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
766 stdout_log('\n%s', vpp_output)
767 stdout_log(single_line_delim)
769 if hasattr(cls, 'vpp_stderr_deque'):
770 stderr_log(single_line_delim)
771 stderr_log('VPP output to stderr while running %s:', cls.__name__)
772 stderr_log(single_line_delim)
773 vpp_output = "".join(cls.vpp_stderr_deque)
774 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
776 stderr_log('\n%s', vpp_output)
777 stderr_log(single_line_delim)
780 def tearDownClass(cls):
781 """ Perform final cleanup after running all tests in this test-case """
782 cls.logger.debug("--- tearDownClass() for %s called ---" %
784 cls.reporter.send_keep_alive(cls, 'tearDownClass')
786 cls.file_handler.close()
787 cls.reset_packet_infos()
789 debug_internal.on_tear_down_class(cls)
791 def show_commands_at_teardown(self):
792 """ Allow subclass specific teardown logging additions."""
793 self.logger.info("--- No test specific show commands provided. ---")
796 """ Show various debug prints after each test """
797 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
798 (self.__class__.__name__, self._testMethodName,
799 self._testMethodDoc))
802 if not self.vpp_dead:
803 self.logger.debug(self.vapi.cli("show trace max 1000"))
804 self.logger.info(self.vapi.ppcli("show interface"))
805 self.logger.info(self.vapi.ppcli("show hardware"))
806 self.logger.info(self.statistics.set_errors_str())
807 self.logger.info(self.vapi.ppcli("show run"))
808 self.logger.info(self.vapi.ppcli("show log"))
809 self.logger.info(self.vapi.ppcli("show bihash"))
810 self.logger.info("Logging testcase specific show commands.")
811 self.show_commands_at_teardown()
812 self.registry.remove_vpp_config(self.logger)
813 # Save/Dump VPP api trace log
814 m = self._testMethodName
815 api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
816 tmp_api_trace = "/tmp/%s" % api_trace
817 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
818 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
819 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
821 os.rename(tmp_api_trace, vpp_api_trace_log)
822 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
824 except VppTransportSocketIOError:
825 self.logger.debug("VppTransportSocketIOError: Vpp dead. "
826 "Cannot log show commands.")
829 self.registry.unregister_all(self.logger)
832 """ Clear trace before running each test"""
833 super(VppTestCase, self).setUp()
834 self.reporter.send_keep_alive(self)
836 raise VppDiedError(rv=None, testcase=self.__class__.__name__,
837 method_name=self._testMethodName)
838 self.sleep(.1, "during setUp")
839 self.vpp_stdout_deque.append(
840 "--- test setUp() for %s.%s(%s) starts here ---\n" %
841 (self.__class__.__name__, self._testMethodName,
842 self._testMethodDoc))
843 self.vpp_stderr_deque.append(
844 "--- test setUp() for %s.%s(%s) starts here ---\n" %
845 (self.__class__.__name__, self._testMethodName,
846 self._testMethodDoc))
847 self.vapi.cli("clear trace")
848 # store the test instance inside the test class - so that objects
849 # holding the class can access instance methods (like assertEqual)
850 type(self).test_instance = self
853 def pg_enable_capture(cls, interfaces=None):
855 Enable capture on packet-generator interfaces
857 :param interfaces: iterable interface indexes (if None,
858 use self.pg_interfaces)
861 if interfaces is None:
862 interfaces = cls.pg_interfaces
867 def register_pcap(cls, intf, worker):
868 """ Register a pcap in the testclass """
869 # add to the list of captures with current timestamp
870 cls._pcaps.append((intf, worker))
873 def get_vpp_time(cls):
874 # processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT"
875 # returns float("2.190522")
876 timestr = cls.vapi.cli('show clock')
877 head, sep, tail = timestr.partition(',')
878 head, sep, tail = head.partition('Time now')
882 def sleep_on_vpp_time(cls, sec):
883 """ Sleep according to time in VPP world """
884 # On a busy system with many processes
885 # we might end up with VPP time being slower than real world
886 # So take that into account when waiting for VPP to do something
887 start_time = cls.get_vpp_time()
888 while cls.get_vpp_time() - start_time < sec:
892 def pg_start(cls, trace=True):
893 """ Enable the PG, wait till it is done, then clean up """
894 for (intf, worker) in cls._old_pcaps:
895 intf.rename_old_pcap_file(intf.get_in_path(worker),
896 intf.in_history_counter)
899 cls.vapi.cli("clear trace")
900 cls.vapi.cli("trace add pg-input 1000")
901 cls.vapi.cli('packet-generator enable')
902 # PG, when starts, runs to completion -
903 # so let's avoid a race condition,
904 # and wait a little till it's done.
905 # Then clean it up - and then be gone.
906 deadline = time.time() + 300
907 while cls.vapi.cli('show packet-generator').find("Yes") != -1:
908 cls.sleep(0.01) # yield
909 if time.time() > deadline:
910 cls.logger.error("Timeout waiting for pg to stop")
912 for intf, worker in cls._pcaps:
913 cls.vapi.cli('packet-generator delete %s' %
914 intf.get_cap_name(worker))
915 cls._old_pcaps = cls._pcaps
919 def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0,
922 Create packet-generator interfaces.
924 :param interfaces: iterable indexes of the interfaces.
925 :returns: List of created interfaces.
930 intf = VppPGInterface(cls, i, gso, gso_size, mode)
931 setattr(cls, intf.name, intf)
933 cls.pg_interfaces = result
937 def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
938 pgmode = VppEnum.vl_api_pg_interface_mode_t
939 return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
940 pgmode.PG_API_MODE_IP4)
943 def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
944 pgmode = VppEnum.vl_api_pg_interface_mode_t
945 return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
946 pgmode.PG_API_MODE_IP6)
949 def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
950 pgmode = VppEnum.vl_api_pg_interface_mode_t
951 return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
952 pgmode.PG_API_MODE_ETHERNET)
955 def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
956 pgmode = VppEnum.vl_api_pg_interface_mode_t
957 return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
958 pgmode.PG_API_MODE_ETHERNET)
961 def create_loopback_interfaces(cls, count):
963 Create loopback interfaces.
965 :param count: number of interfaces created.
966 :returns: List of created interfaces.
968 result = [VppLoInterface(cls) for i in range(count)]
970 setattr(cls, intf.name, intf)
971 cls.lo_interfaces = result
975 def create_bvi_interfaces(cls, count):
977 Create BVI interfaces.
979 :param count: number of interfaces created.
980 :returns: List of created interfaces.
982 result = [VppBviInterface(cls) for i in range(count)]
984 setattr(cls, intf.name, intf)
985 cls.bvi_interfaces = result
989 def extend_packet(packet, size, padding=' '):
991 Extend packet to given size by padding with spaces or custom padding
992 NOTE: Currently works only when Raw layer is present.
994 :param packet: packet
995 :param size: target size
996 :param padding: padding used to extend the payload
999 packet_len = len(packet) + 4
1000 extend = size - packet_len
1002 num = (extend // len(padding)) + 1
1003 packet[Raw].load += (padding * num)[:extend].encode("ascii")
1006 def reset_packet_infos(cls):
1007 """ Reset the list of packet info objects and packet counts to zero """
1008 cls._packet_infos = {}
1009 cls._packet_count_for_dst_if_idx = {}
1012 def create_packet_info(cls, src_if, dst_if):
1014 Create packet info object containing the source and destination indexes
1015 and add it to the testcase's packet info list
1017 :param VppInterface src_if: source interface
1018 :param VppInterface dst_if: destination interface
1020 :returns: _PacketInfo object
1023 info = _PacketInfo()
1024 info.index = len(cls._packet_infos)
1025 info.src = src_if.sw_if_index
1026 info.dst = dst_if.sw_if_index
1027 if isinstance(dst_if, VppSubInterface):
1028 dst_idx = dst_if.parent.sw_if_index
1030 dst_idx = dst_if.sw_if_index
1031 if dst_idx in cls._packet_count_for_dst_if_idx:
1032 cls._packet_count_for_dst_if_idx[dst_idx] += 1
1034 cls._packet_count_for_dst_if_idx[dst_idx] = 1
1035 cls._packet_infos[info.index] = info
1039 def info_to_payload(info):
1041 Convert _PacketInfo object to packet payload
1043 :param info: _PacketInfo object
1045 :returns: string containing serialized data from packet info
1047 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
1048 info.ip, info.proto)
1051 def payload_to_info(payload, payload_field='load'):
1053 Convert packet payload to _PacketInfo object
1055 :param payload: packet payload
1056 :type payload: <class 'scapy.packet.Raw'>
1057 :param payload_field: packet fieldname of payload "load" for
1058 <class 'scapy.packet.Raw'>
1059 :type payload_field: str
1060 :returns: _PacketInfo object containing de-serialized data from payload
1063 numbers = getattr(payload, payload_field).split()
1064 info = _PacketInfo()
1065 info.index = int(numbers[0])
1066 info.src = int(numbers[1])
1067 info.dst = int(numbers[2])
1068 info.ip = int(numbers[3])
1069 info.proto = int(numbers[4])
1072 def get_next_packet_info(self, info):
1074 Iterate over the packet info list stored in the testcase
1075 Start iteration with first element if info is None
1076 Continue based on index in info if info is specified
1078 :param info: info or None
1079 :returns: next info in list or None if no more infos
1084 next_index = info.index + 1
1085 if next_index == len(self._packet_infos):
1088 return self._packet_infos[next_index]
1090 def get_next_packet_info_for_interface(self, src_index, info):
1092 Search the packet info list for the next packet info with same source
1095 :param src_index: source interface index to search for
1096 :param info: packet info - where to start the search
1097 :returns: packet info or None
1101 info = self.get_next_packet_info(info)
1104 if info.src == src_index:
1107 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
1109 Search the packet info list for the next packet info with same source
1110 and destination interface indexes
1112 :param src_index: source interface index to search for
1113 :param dst_index: destination interface index to search for
1114 :param info: packet info - where to start the search
1115 :returns: packet info or None
1119 info = self.get_next_packet_info_for_interface(src_index, info)
1122 if info.dst == dst_index:
1125 def assert_equal(self, real_value, expected_value, name_or_class=None):
1126 if name_or_class is None:
1127 self.assertEqual(real_value, expected_value)
1130 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
1131 msg = msg % (getdoc(name_or_class).strip(),
1132 real_value, str(name_or_class(real_value)),
1133 expected_value, str(name_or_class(expected_value)))
1135 msg = "Invalid %s: %s does not match expected value %s" % (
1136 name_or_class, real_value, expected_value)
1138 self.assertEqual(real_value, expected_value, msg)
1140 def assert_in_range(self,
1148 msg = "Invalid %s: %s out of range <%s,%s>" % (
1149 name, real_value, expected_min, expected_max)
1150 self.assertTrue(expected_min <= real_value <= expected_max, msg)
1152 def assert_packet_checksums_valid(self, packet,
1153 ignore_zero_udp_checksums=True):
1154 received = packet.__class__(scapy.compat.raw(packet))
1155 udp_layers = ['UDP', 'UDPerror']
1156 checksum_fields = ['cksum', 'chksum']
1159 temp = received.__class__(scapy.compat.raw(received))
1161 layer = temp.getlayer(counter)
1163 layer = layer.copy()
1164 layer.remove_payload()
1165 for cf in checksum_fields:
1166 if hasattr(layer, cf):
1167 if ignore_zero_udp_checksums and \
1168 0 == getattr(layer, cf) and \
1169 layer.name in udp_layers:
1171 delattr(temp.getlayer(counter), cf)
1172 checksums.append((counter, cf))
1175 counter = counter + 1
1176 if 0 == len(checksums):
1178 temp = temp.__class__(scapy.compat.raw(temp))
1179 for layer, cf in checksums:
1180 calc_sum = getattr(temp[layer], cf)
1182 getattr(received[layer], cf), calc_sum,
1183 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
1185 "Checksum field `%s` on `%s` layer has correct value `%s`" %
1186 (cf, temp[layer].name, calc_sum))
1188 def assert_checksum_valid(self, received_packet, layer,
1189 field_name='chksum',
1190 ignore_zero_checksum=False):
1191 """ Check checksum of received packet on given layer """
1192 received_packet_checksum = getattr(received_packet[layer], field_name)
1193 if ignore_zero_checksum and 0 == received_packet_checksum:
1195 recalculated = received_packet.__class__(
1196 scapy.compat.raw(received_packet))
1197 delattr(recalculated[layer], field_name)
1198 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1199 self.assert_equal(received_packet_checksum,
1200 getattr(recalculated[layer], field_name),
1201 "packet checksum on layer: %s" % layer)
1203 def assert_ip_checksum_valid(self, received_packet,
1204 ignore_zero_checksum=False):
1205 self.assert_checksum_valid(received_packet, 'IP',
1206 ignore_zero_checksum=ignore_zero_checksum)
1208 def assert_tcp_checksum_valid(self, received_packet,
1209 ignore_zero_checksum=False):
1210 self.assert_checksum_valid(received_packet, 'TCP',
1211 ignore_zero_checksum=ignore_zero_checksum)
1213 def assert_udp_checksum_valid(self, received_packet,
1214 ignore_zero_checksum=True):
1215 self.assert_checksum_valid(received_packet, 'UDP',
1216 ignore_zero_checksum=ignore_zero_checksum)
1218 def assert_embedded_icmp_checksum_valid(self, received_packet):
1219 if received_packet.haslayer(IPerror):
1220 self.assert_checksum_valid(received_packet, 'IPerror')
1221 if received_packet.haslayer(TCPerror):
1222 self.assert_checksum_valid(received_packet, 'TCPerror')
1223 if received_packet.haslayer(UDPerror):
1224 self.assert_checksum_valid(received_packet, 'UDPerror',
1225 ignore_zero_checksum=True)
1226 if received_packet.haslayer(ICMPerror):
1227 self.assert_checksum_valid(received_packet, 'ICMPerror')
1229 def assert_icmp_checksum_valid(self, received_packet):
1230 self.assert_checksum_valid(received_packet, 'ICMP')
1231 self.assert_embedded_icmp_checksum_valid(received_packet)
1233 def assert_icmpv6_checksum_valid(self, pkt):
1234 if pkt.haslayer(ICMPv6DestUnreach):
1235 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
1236 self.assert_embedded_icmp_checksum_valid(pkt)
1237 if pkt.haslayer(ICMPv6EchoRequest):
1238 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
1239 if pkt.haslayer(ICMPv6EchoReply):
1240 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
1242 def get_packet_counter(self, counter):
1243 if counter.startswith("/"):
1244 counter_value = self.statistics.get_counter(counter)
1246 counters = self.vapi.cli("sh errors").split('\n')
1248 for i in range(1, len(counters) - 1):
1249 results = counters[i].split()
1250 if results[1] == counter:
1251 counter_value = int(results[0])
1253 return counter_value
1255 def assert_packet_counter_equal(self, counter, expected_value):
1256 counter_value = self.get_packet_counter(counter)
1257 self.assert_equal(counter_value, expected_value,
1258 "packet counter `%s'" % counter)
1260 def assert_error_counter_equal(self, counter, expected_value):
1261 counter_value = self.statistics[counter].sum()
1262 self.assert_equal(counter_value, expected_value,
1263 "error counter `%s'" % counter)
1266 def sleep(cls, timeout, remark=None):
1268 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1269 # * by Guido, only the main thread can be interrupted.
1271 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1274 if hasattr(os, 'sched_yield'):
1280 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1281 before = time.time()
1284 if after - before > 2 * timeout:
1285 cls.logger.error("unexpected self.sleep() result - "
1286 "slept for %es instead of ~%es!",
1287 after - before, timeout)
1290 "Finished sleep (%s) - slept %es (wanted %es)",
1291 remark, after - before, timeout)
1293 def pg_send(self, intf, pkts, worker=None, trace=True):
1294 intf.add_stream(pkts, worker=worker)
1295 self.pg_enable_capture(self.pg_interfaces)
1296 self.pg_start(trace=trace)
1298 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
1299 self.pg_send(intf, pkts)
1302 for i in self.pg_interfaces:
1303 i.get_capture(0, timeout=timeout)
1304 i.assert_nothing_captured(remark=remark)
1307 def send_and_expect(self, intf, pkts, output, n_rx=None, worker=None,
1311 self.pg_send(intf, pkts, worker=worker, trace=trace)
1312 rx = output.get_capture(n_rx)
1315 def send_and_expect_only(self, intf, pkts, output, timeout=None):
1316 self.pg_send(intf, pkts)
1317 rx = output.get_capture(len(pkts))
1321 for i in self.pg_interfaces:
1322 if i not in outputs:
1323 i.get_capture(0, timeout=timeout)
1324 i.assert_nothing_captured()
1330 def get_testcase_doc_name(test):
1331 return getdoc(test.__class__).splitlines()[0]
1334 def get_test_description(descriptions, test):
1335 short_description = test.shortDescription()
1336 if descriptions and short_description:
1337 return short_description
1342 class TestCaseInfo(object):
1343 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1344 self.logger = logger
1345 self.tempdir = tempdir
1346 self.vpp_pid = vpp_pid
1347 self.vpp_bin_path = vpp_bin_path
1348 self.core_crash_test = None
1351 class VppTestResult(unittest.TestResult):
1353 @property result_string
1354 String variable to store the test case result string.
1356 List variable containing 2-tuples of TestCase instances and strings
1357 holding formatted tracebacks. Each tuple represents a test which
1358 raised an unexpected exception.
1360 List variable containing 2-tuples of TestCase instances and strings
1361 holding formatted tracebacks. Each tuple represents a test where
1362 a failure was explicitly signalled using the TestCase.assert*()
1366 failed_test_cases_info = set()
1367 core_crash_test_cases_info = set()
1368 current_test_case_info = None
1370 def __init__(self, stream=None, descriptions=None, verbosity=None,
1373 :param stream File descriptor to store where to report test results.
1374 Set to the standard error stream by default.
1375 :param descriptions Boolean variable to store information if to use
1376 test case descriptions.
1377 :param verbosity Integer variable to store required verbosity level.
1379 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1380 self.stream = stream
1381 self.descriptions = descriptions
1382 self.verbosity = verbosity
1383 self.result_string = None
1384 self.runner = runner
1387 def addSuccess(self, test):
1389 Record a test succeeded result
1394 if self.current_test_case_info:
1395 self.current_test_case_info.logger.debug(
1396 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1397 test._testMethodName,
1398 test._testMethodDoc))
1399 unittest.TestResult.addSuccess(self, test)
1400 self.result_string = colorize("OK", GREEN)
1402 self.send_result_through_pipe(test, PASS)
1404 def addSkip(self, test, reason):
1406 Record a test skipped.
1412 if self.current_test_case_info:
1413 self.current_test_case_info.logger.debug(
1414 "--- addSkip() %s.%s(%s) called, reason is %s" %
1415 (test.__class__.__name__, test._testMethodName,
1416 test._testMethodDoc, reason))
1417 unittest.TestResult.addSkip(self, test, reason)
1418 self.result_string = colorize("SKIP", YELLOW)
1420 if reason == "not enough cpus":
1421 self.send_result_through_pipe(test, SKIP_CPU_SHORTAGE)
1423 self.send_result_through_pipe(test, SKIP)
1425 def symlink_failed(self):
1426 if self.current_test_case_info:
1428 failed_dir = os.getenv('FAILED_DIR')
1429 link_path = os.path.join(
1432 os.path.basename(self.current_test_case_info.tempdir))
1434 self.current_test_case_info.logger.debug(
1435 "creating a link to the failed test")
1436 self.current_test_case_info.logger.debug(
1437 "os.symlink(%s, %s)" %
1438 (self.current_test_case_info.tempdir, link_path))
1439 if os.path.exists(link_path):
1440 self.current_test_case_info.logger.debug(
1441 'symlink already exists')
1443 os.symlink(self.current_test_case_info.tempdir, link_path)
1445 except Exception as e:
1446 self.current_test_case_info.logger.error(e)
1448 def send_result_through_pipe(self, test, result):
1449 if hasattr(self, 'test_framework_result_pipe'):
1450 pipe = self.test_framework_result_pipe
1452 pipe.send((test.id(), result))
1454 def log_error(self, test, err, fn_name):
1455 if self.current_test_case_info:
1456 if isinstance(test, unittest.suite._ErrorHolder):
1457 test_name = test.description
1459 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1460 test._testMethodName,
1461 test._testMethodDoc)
1462 self.current_test_case_info.logger.debug(
1463 "--- %s() %s called, err is %s" %
1464 (fn_name, test_name, err))
1465 self.current_test_case_info.logger.debug(
1466 "formatted exception is:\n%s" %
1467 "".join(format_exception(*err)))
1469 def add_error(self, test, err, unittest_fn, error_type):
1470 if error_type == FAIL:
1471 self.log_error(test, err, 'addFailure')
1472 error_type_str = colorize("FAIL", RED)
1473 elif error_type == ERROR:
1474 self.log_error(test, err, 'addError')
1475 error_type_str = colorize("ERROR", RED)
1477 raise Exception('Error type %s cannot be used to record an '
1478 'error or a failure' % error_type)
1480 unittest_fn(self, test, err)
1481 if self.current_test_case_info:
1482 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1484 self.current_test_case_info.tempdir)
1485 self.symlink_failed()
1486 self.failed_test_cases_info.add(self.current_test_case_info)
1487 if is_core_present(self.current_test_case_info.tempdir):
1488 if not self.current_test_case_info.core_crash_test:
1489 if isinstance(test, unittest.suite._ErrorHolder):
1490 test_name = str(test)
1492 test_name = "'{!s}' ({!s})".format(
1493 get_testcase_doc_name(test), test.id())
1494 self.current_test_case_info.core_crash_test = test_name
1495 self.core_crash_test_cases_info.add(
1496 self.current_test_case_info)
1498 self.result_string = '%s [no temp dir]' % error_type_str
1500 self.send_result_through_pipe(test, error_type)
1502 def addFailure(self, test, err):
1504 Record a test failed result
1507 :param err: error message
1510 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1512 def addError(self, test, err):
1514 Record a test error result
1517 :param err: error message
1520 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1522 def getDescription(self, test):
1524 Get test description
1527 :returns: test description
1530 return get_test_description(self.descriptions, test)
1532 def startTest(self, test):
1540 def print_header(test):
1541 if test.__class__ in self.printed:
1544 test_doc = getdoc(test)
1546 raise Exception("No doc string for test '%s'" % test.id())
1548 test_title = test_doc.splitlines()[0].rstrip()
1549 test_title = colorize(test_title, GREEN)
1550 if test.is_tagged_run_solo():
1551 test_title = colorize(f"SOLO RUN: {test_title}", YELLOW)
1553 # This block may overwrite the colorized title above,
1554 # but we want this to stand out and be fixed
1555 if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
1556 test_title = colorize(
1557 f"FIXME with VPP workers: {test_title}", RED)
1559 if hasattr(test, 'vpp_worker_count'):
1560 if test.vpp_worker_count == 0:
1561 test_title += " [main thread only]"
1562 elif test.vpp_worker_count == 1:
1563 test_title += " [1 worker thread]"
1565 test_title += f" [{test.vpp_worker_count} worker threads]"
1567 if test.__class__.skipped_due_to_cpu_lack:
1568 test_title = colorize(
1569 f"{test_title} [skipped - not enough cpus, "
1570 f"required={test.__class__.get_cpus_required()}, "
1571 f"available={max_vpp_cpus}]", YELLOW)
1573 print(double_line_delim)
1575 print(double_line_delim)
1576 self.printed.append(test.__class__)
1579 self.start_test = time.time()
1580 unittest.TestResult.startTest(self, test)
1581 if self.verbosity > 0:
1582 self.stream.writeln(
1583 "Starting " + self.getDescription(test) + " ...")
1584 self.stream.writeln(single_line_delim)
1586 def stopTest(self, test):
1588 Called when the given test has been run
1593 unittest.TestResult.stopTest(self, test)
1595 if self.verbosity > 0:
1596 self.stream.writeln(single_line_delim)
1597 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1598 self.result_string))
1599 self.stream.writeln(single_line_delim)
1601 self.stream.writeln("%-68s %4.2f %s" %
1602 (self.getDescription(test),
1603 time.time() - self.start_test,
1604 self.result_string))
1606 self.send_result_through_pipe(test, TEST_RUN)
1608 def printErrors(self):
1610 Print errors from running the test case
1612 if len(self.errors) > 0 or len(self.failures) > 0:
1613 self.stream.writeln()
1614 self.printErrorList('ERROR', self.errors)
1615 self.printErrorList('FAIL', self.failures)
1617 # ^^ that is the last output from unittest before summary
1618 if not self.runner.print_summary:
1619 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1620 self.stream = devnull
1621 self.runner.stream = devnull
1623 def printErrorList(self, flavour, errors):
1625 Print error list to the output stream together with error type
1626 and test case description.
1628 :param flavour: error type
1629 :param errors: iterable errors
1632 for test, err in errors:
1633 self.stream.writeln(double_line_delim)
1634 self.stream.writeln("%s: %s" %
1635 (flavour, self.getDescription(test)))
1636 self.stream.writeln(single_line_delim)
1637 self.stream.writeln("%s" % err)
1640 class VppTestRunner(unittest.TextTestRunner):
1642 A basic test runner implementation which prints results to standard error.
1646 def resultclass(self):
1647 """Class maintaining the results of the tests"""
1648 return VppTestResult
1650 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1651 result_pipe=None, failfast=False, buffer=False,
1652 resultclass=None, print_summary=True, **kwargs):
1653 # ignore stream setting here, use hard-coded stdout to be in sync
1654 # with prints from VppTestCase methods ...
1655 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1656 verbosity, failfast, buffer,
1657 resultclass, **kwargs)
1658 KeepAliveReporter.pipe = keep_alive_pipe
1660 self.orig_stream = self.stream
1661 self.resultclass.test_framework_result_pipe = result_pipe
1663 self.print_summary = print_summary
1665 def _makeResult(self):
1666 return self.resultclass(self.stream,
1671 def run(self, test):
1678 faulthandler.enable() # emit stack trace to stderr if killed by signal
1680 result = super(VppTestRunner, self).run(test)
1681 if not self.print_summary:
1682 self.stream = self.orig_stream
1683 result.stream = self.orig_stream
1687 class Worker(Thread):
1688 def __init__(self, executable_args, logger, env=None, *args, **kwargs):
1689 super(Worker, self).__init__(*args, **kwargs)
1690 self.logger = logger
1691 self.args = executable_args
1692 if hasattr(self, 'testcase') and self.testcase.debug_all:
1693 if self.testcase.debug_gdbserver:
1694 self.args = ['/usr/bin/gdbserver', 'localhost:{port}'
1695 .format(port=self.testcase.gdbserver_port)] + args
1696 elif self.testcase.debug_gdb and hasattr(self, 'wait_for_gdb'):
1697 self.args.append(self.wait_for_gdb)
1698 self.app_bin = executable_args[0]
1699 self.app_name = os.path.basename(self.app_bin)
1700 if hasattr(self, 'role'):
1701 self.app_name += ' {role}'.format(role=self.role)
1704 env = {} if env is None else env
1705 self.env = copy.deepcopy(env)
1707 def wait_for_enter(self):
1708 if not hasattr(self, 'testcase'):
1710 if self.testcase.debug_all and self.testcase.debug_gdbserver:
1712 print(double_line_delim)
1713 print("Spawned GDB Server for '{app}' with PID: {pid}"
1714 .format(app=self.app_name, pid=self.process.pid))
1715 elif self.testcase.debug_all and self.testcase.debug_gdb:
1717 print(double_line_delim)
1718 print("Spawned '{app}' with PID: {pid}"
1719 .format(app=self.app_name, pid=self.process.pid))
1722 print(single_line_delim)
1723 print("You can debug '{app}' using:".format(app=self.app_name))
1724 if self.testcase.debug_gdbserver:
1725 print("sudo gdb " + self.app_bin +
1726 " -ex 'target remote localhost:{port}'"
1727 .format(port=self.testcase.gdbserver_port))
1728 print("Now is the time to attach gdb by running the above "
1729 "command, set up breakpoints etc., then resume from "
1730 "within gdb by issuing the 'continue' command")
1731 self.testcase.gdbserver_port += 1
1732 elif self.testcase.debug_gdb:
1733 print("sudo gdb " + self.app_bin +
1734 " -ex 'attach {pid}'".format(pid=self.process.pid))
1735 print("Now is the time to attach gdb by running the above "
1736 "command and set up breakpoints etc., then resume from"
1737 " within gdb by issuing the 'continue' command")
1738 print(single_line_delim)
1739 input("Press ENTER to continue running the testcase...")
1742 executable = self.args[0]
1743 if not os.path.exists(executable) or not os.access(
1744 executable, os.F_OK | os.X_OK):
1745 # Exit code that means some system file did not exist,
1746 # could not be opened, or had some other kind of error.
1747 self.result = os.EX_OSFILE
1748 raise EnvironmentError(
1749 "executable '%s' is not found or executable." % executable)
1750 self.logger.debug("Running executable '{app}': '{cmd}'"
1751 .format(app=self.app_name,
1752 cmd=' '.join(self.args)))
1753 env = os.environ.copy()
1754 env.update(self.env)
1755 env["CK_LOG_FILE_NAME"] = "-"
1756 self.process = subprocess.Popen(
1757 ['stdbuf', '-o0', '-e0'] + self.args, shell=False, env=env,
1758 preexec_fn=os.setpgrp, stdout=subprocess.PIPE,
1759 stderr=subprocess.PIPE)
1760 self.wait_for_enter()
1761 out, err = self.process.communicate()
1762 self.logger.debug("Finished running `{app}'".format(app=self.app_name))
1763 self.logger.info("Return code is `%s'" % self.process.returncode)
1764 self.logger.info(single_line_delim)
1765 self.logger.info("Executable `{app}' wrote to stdout:"
1766 .format(app=self.app_name))
1767 self.logger.info(single_line_delim)
1768 self.logger.info(out.decode('utf-8'))
1769 self.logger.info(single_line_delim)
1770 self.logger.info("Executable `{app}' wrote to stderr:"
1771 .format(app=self.app_name))
1772 self.logger.info(single_line_delim)
1773 self.logger.info(err.decode('utf-8'))
1774 self.logger.info(single_line_delim)
1775 self.result = self.process.returncode
1778 if __name__ == '__main__':