3 from __future__ import print_function
19 from collections import deque
20 from threading import Thread, Event
21 from inspect import getdoc, isclass
22 from traceback import format_exception
23 from logging import FileHandler, DEBUG, Formatter
25 from abc import ABC, abstractmethod
26 from struct import pack, unpack
29 from scapy.packet import Raw, Packet
30 from config import config, available_cpus, num_cpus, max_vpp_cpus
31 import hook as hookmodule
32 from vpp_pg_interface import VppPGInterface
33 from vpp_sub_interface import VppSubInterface
34 from vpp_lo_interface import VppLoInterface
35 from vpp_bvi_interface import VppBviInterface
36 from vpp_papi_provider import VppPapiProvider
37 from vpp_papi import VppEnum
39 from vpp_papi.vpp_stats import VPPStats
40 from vpp_papi.vpp_transport_socket import VppTransportSocketIOError
41 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
43 from vpp_object import VppObjectRegistry
44 from util import ppp, is_core_present
45 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
46 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
47 from scapy.layers.inet6 import ICMPv6EchoReply
50 logger = logging.getLogger(__name__)
52 # Set up an empty logger for the testcase that can be overridden as necessary
53 null_logger = logging.getLogger('VppTestCase')
54 null_logger.addHandler(logging.NullHandler())
64 if config.debug_framework:
68 Test framework module.
70 The module provides a set of tools for constructing and running tests and
71 representing the results.
75 class VppDiedError(Exception):
76 """ exception for reporting that the subprocess has died."""
78 signals_by_value = {v: k for k, v in signal.__dict__.items() if
79 k.startswith('SIG') and not k.startswith('SIG_')}
81 def __init__(self, rv=None, testcase=None, method_name=None):
83 self.signal_name = None
84 self.testcase = testcase
85 self.method_name = method_name
88 self.signal_name = VppDiedError.signals_by_value[-rv]
89 except (KeyError, TypeError):
92 if testcase is None and method_name is None:
95 in_msg = ' while running %s.%s' % (testcase, method_name)
98 msg = "VPP subprocess died unexpectedly%s with return code: %d%s."\
99 % (in_msg, self.rv, ' [%s]' %
101 self.signal_name is not None else ''))
103 msg = "VPP subprocess died unexpectedly%s." % in_msg
105 super(VppDiedError, self).__init__(msg)
108 class _PacketInfo(object):
109 """Private class to create packet info object.
111 Help process information about the next packet.
112 Set variables to default values.
114 #: Store the index of the packet.
116 #: Store the index of the source packet generator interface of the packet.
118 #: Store the index of the destination packet generator interface
121 #: Store expected ip version
123 #: Store expected upper protocol
125 #: Store the copy of the former packet.
128 def __eq__(self, other):
129 index = self.index == other.index
130 src = self.src == other.src
131 dst = self.dst == other.dst
132 data = self.data == other.data
133 return index and src and dst and data
136 def pump_output(testclass):
137 """ pump output from vpp stdout/stderr to proper queues """
140 while not testclass.pump_thread_stop_flag.is_set():
141 readable = select.select([testclass.vpp.stdout.fileno(),
142 testclass.vpp.stderr.fileno(),
143 testclass.pump_thread_wakeup_pipe[0]],
145 if testclass.vpp.stdout.fileno() in readable:
146 read = os.read(testclass.vpp.stdout.fileno(), 102400)
148 split = read.decode('ascii',
149 errors='backslashreplace').splitlines(True)
150 if len(stdout_fragment) > 0:
151 split[0] = "%s%s" % (stdout_fragment, split[0])
152 if len(split) > 0 and split[-1].endswith("\n"):
156 stdout_fragment = split[-1]
157 testclass.vpp_stdout_deque.extend(split[:limit])
158 if not config.cache_vpp_output:
159 for line in split[:limit]:
160 testclass.logger.info(
161 "VPP STDOUT: %s" % line.rstrip("\n"))
162 if testclass.vpp.stderr.fileno() in readable:
163 read = os.read(testclass.vpp.stderr.fileno(), 102400)
165 split = read.decode('ascii',
166 errors='backslashreplace').splitlines(True)
167 if len(stderr_fragment) > 0:
168 split[0] = "%s%s" % (stderr_fragment, split[0])
169 if len(split) > 0 and split[-1].endswith("\n"):
173 stderr_fragment = split[-1]
175 testclass.vpp_stderr_deque.extend(split[:limit])
176 if not config.cache_vpp_output:
177 for line in split[:limit]:
178 testclass.logger.error(
179 "VPP STDERR: %s" % line.rstrip("\n"))
180 # ignoring the dummy pipe here intentionally - the
181 # flag will take care of properly terminating the loop
184 def _is_platform_aarch64():
185 return platform.machine() == 'aarch64'
188 is_platform_aarch64 = _is_platform_aarch64()
191 class KeepAliveReporter(object):
193 Singleton object which reports test start to parent process
198 self.__dict__ = self._shared_state
206 def pipe(self, pipe):
207 if self._pipe is not None:
208 raise Exception("Internal error - pipe should only be set once.")
211 def send_keep_alive(self, test, desc=None):
213 Write current test tmpdir & desc to keep-alive pipe to signal liveness
215 if self.pipe is None:
216 # if not running forked..
220 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
224 self.pipe.send((desc, config.vpp, test.tempdir, test.vpp.pid))
227 class TestCaseTag(Enum):
228 # marks the suites that must run at the end
229 # using only a single test runner
231 # marks the suites broken on VPP multi-worker
232 FIXME_VPP_WORKERS = 2
233 # marks the suites broken when ASan is enabled
237 def create_tag_decorator(e):
240 cls.test_tags.append(e)
241 except AttributeError:
247 tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
248 tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
249 tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN)
263 class CPUInterface(ABC):
265 skipped_due_to_cpu_lack = False
269 def get_cpus_required(cls):
273 def assign_cpus(cls, cpus):
277 class VppTestCase(CPUInterface, unittest.TestCase):
278 """This subclass is a base class for VPP test cases that are implemented as
279 classes. It provides methods to create and run test case.
282 extra_vpp_statseg_config = ""
283 extra_vpp_punt_config = []
284 extra_vpp_plugin_config = []
286 vapi_response_timeout = 5
287 remove_configured_vpp_objects_on_tear_down = True
290 def packet_infos(self):
291 """List of packet infos"""
292 return self._packet_infos
295 def get_packet_count_for_if_idx(cls, dst_if_index):
296 """Get the number of packet info for specified destination if index"""
297 if dst_if_index in cls._packet_count_for_dst_if_idx:
298 return cls._packet_count_for_dst_if_idx[dst_if_index]
303 def has_tag(cls, tag):
304 """ if the test case has a given tag - return true """
306 return tag in cls.test_tags
307 except AttributeError:
312 def is_tagged_run_solo(cls):
313 """ if the test case class is timing-sensitive - return true """
314 return cls.has_tag(TestCaseTag.RUN_SOLO)
317 def skip_fixme_asan(cls):
318 """ if @tag_fixme_asan & ASan is enabled - mark for skip """
319 if cls.has_tag(TestCaseTag.FIXME_ASAN):
320 vpp_extra_cmake_args = os.environ.get('VPP_EXTRA_CMAKE_ARGS', '')
321 if 'DVPP_ENABLE_SANITIZE_ADDR=ON' in vpp_extra_cmake_args:
322 cls = unittest.skip("Skipping @tag_fixme_asan tests")(cls)
326 """Return the instance of this testcase"""
327 return cls.test_instance
330 def set_debug_flags(cls, d):
331 cls.gdbserver_port = 7777
332 cls.debug_core = False
333 cls.debug_gdb = False
334 cls.debug_gdbserver = False
335 cls.debug_all = False
336 cls.debug_attach = False
341 cls.debug_core = True
342 elif dl == "gdb" or dl == "gdb-all":
344 elif dl == "gdbserver" or dl == "gdbserver-all":
345 cls.debug_gdbserver = True
347 cls.debug_attach = True
349 raise Exception("Unrecognized DEBUG option: '%s'" % d)
350 if dl == "gdb-all" or dl == "gdbserver-all":
354 def get_vpp_worker_count(cls):
355 if not hasattr(cls, "vpp_worker_count"):
356 if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
357 cls.vpp_worker_count = 0
359 cls.vpp_worker_count = config.vpp_worker_count
360 return cls.vpp_worker_count
363 def get_cpus_required(cls):
364 return 1 + cls.get_vpp_worker_count()
367 def setUpConstants(cls):
368 """ Set-up the test case class based on environment variables """
369 cls.step = config.step
370 cls.plugin_path = ":".join(config.vpp_plugin_dir)
371 cls.test_plugin_path = ":".join(config.vpp_test_plugin_dir)
372 cls.extern_plugin_path = ":".join(config.extern_plugin_dir)
374 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
375 debug_cli = "cli-listen localhost:5002"
376 size = re.search(r"\d+[gG]", config.coredump_size)
378 coredump_size = f"coredump-size {config.coredump_size}".lower()
380 coredump_size = "coredump-size unlimited"
381 default_variant = config.variant
382 if default_variant is not None:
383 default_variant = "defaults { %s 100 }" % default_variant
387 api_fuzzing = config.api_fuzz
388 if api_fuzzing is None:
393 "unix", "{", "nodaemon", debug_cli, "full-coredump",
394 coredump_size, "runtime-dir", cls.tempdir, "}",
395 "api-trace", "{", "on", "}",
396 "api-segment", "{", "prefix", cls.get_api_segment_prefix(), "}",
397 "cpu", "{", "main-core", str(cls.cpus[0]), ]
398 if cls.extern_plugin_path not in (None, ""):
399 cls.extra_vpp_plugin_config.append(
400 "add-path %s" % cls.extern_plugin_path)
401 if cls.get_vpp_worker_count():
402 cls.vpp_cmdline.extend([
403 "corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])])
404 cls.vpp_cmdline.extend([
406 "physmem", "{", "max-size", "32m", "}",
407 "statseg", "{", "socket-name", cls.get_stats_sock_path(),
408 cls.extra_vpp_statseg_config, "}",
409 "socksvr", "{", "socket-name", cls.get_api_sock_path(), "}",
410 "node { ", default_variant, "}",
411 "api-fuzz {", api_fuzzing, "}",
412 "plugins", "{", "plugin", "dpdk_plugin.so", "{", "disable", "}",
413 "plugin", "rdma_plugin.so", "{", "disable", "}",
414 "plugin", "lisp_unittest_plugin.so", "{", "enable", "}",
415 "plugin", "unittest_plugin.so", "{", "enable", "}"
416 ] + cls.extra_vpp_plugin_config + ["}", ])
418 if cls.extra_vpp_punt_config is not None:
419 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
421 if not cls.debug_attach:
422 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
423 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
426 def wait_for_enter(cls):
427 if cls.debug_gdbserver:
428 print(double_line_delim)
429 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
431 print(double_line_delim)
432 print("Spawned VPP with PID: %d" % cls.vpp.pid)
434 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
436 print(single_line_delim)
437 print("You can debug VPP using:")
438 if cls.debug_gdbserver:
439 print(f"sudo gdb {config.vpp} "
440 f"-ex 'target remote localhost:{cls.gdbserver_port}'")
441 print("Now is the time to attach gdb by running the above "
442 "command, set up breakpoints etc., then resume VPP from "
443 "within gdb by issuing the 'continue' command")
444 cls.gdbserver_port += 1
446 print(f"sudo gdb {config.vpp} -ex 'attach {cls.vpp.pid}'")
447 print("Now is the time to attach gdb by running the above "
448 "command and set up breakpoints etc., then resume VPP from"
449 " within gdb by issuing the 'continue' command")
450 print(single_line_delim)
451 input("Press ENTER to continue running the testcase...")
459 cls.logger.debug(f"Assigned cpus: {cls.cpus}")
460 cmdline = cls.vpp_cmdline
462 if cls.debug_gdbserver:
463 gdbserver = '/usr/bin/gdbserver'
464 if not os.path.isfile(gdbserver) or\
465 not os.access(gdbserver, os.X_OK):
466 raise Exception("gdbserver binary '%s' does not exist or is "
467 "not executable" % gdbserver)
469 cmdline = [gdbserver, 'localhost:{port}'
470 .format(port=cls.gdbserver_port)] + cls.vpp_cmdline
471 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
474 cls.vpp = subprocess.Popen(cmdline,
475 stdout=subprocess.PIPE,
476 stderr=subprocess.PIPE)
477 except subprocess.CalledProcessError as e:
478 cls.logger.critical("Subprocess returned with non-0 return code: ("
482 cls.logger.critical("Subprocess returned with OS error: "
483 "(%s) %s", e.errno, e.strerror)
485 except Exception as e:
486 cls.logger.exception("Subprocess returned unexpected from "
493 def wait_for_coredump(cls):
494 corefile = cls.tempdir + "/core"
495 if os.path.isfile(corefile):
496 cls.logger.error("Waiting for coredump to complete: %s", corefile)
497 curr_size = os.path.getsize(corefile)
498 deadline = time.time() + 60
500 while time.time() < deadline:
503 curr_size = os.path.getsize(corefile)
504 if size == curr_size:
508 cls.logger.error("Timed out waiting for coredump to complete:"
511 cls.logger.error("Coredump complete: %s, size %d",
515 def get_stats_sock_path(cls):
516 return "%s/stats.sock" % cls.tempdir
519 def get_api_sock_path(cls):
520 return "%s/api.sock" % cls.tempdir
523 def get_api_segment_prefix(cls):
524 return os.path.basename(cls.tempdir) # Only used for VAPI
527 def get_tempdir(cls):
528 tmpdir = f"{config.tmp_dir}/vpp-unittest-{cls.__name__}"
529 if config.wipe_tmp_dir:
530 shutil.rmtree(tmpdir, ignore_errors=True)
535 def create_file_handler(cls):
536 if config.log_dir is None:
537 cls.file_handler = FileHandler(f"{cls.tempdir}/log.txt")
540 logdir = f"{config.log_dir}/vpp-unittest-{cls.__name__}"
541 if config.wipe_tmp_dir:
542 shutil.rmtree(logdir, ignore_errors=True)
544 cls.file_handler = FileHandler(f"{logdir}/log.txt")
549 Perform class setup before running the testcase
550 Remove shared memory files, start vpp and connect the vpp-api
552 super(VppTestCase, cls).setUpClass()
553 cls.logger = get_logger(cls.__name__)
554 random.seed(config.rnd_seed)
555 if hasattr(cls, 'parallel_handler'):
556 cls.logger.addHandler(cls.parallel_handler)
557 cls.logger.propagate = False
558 cls.set_debug_flags(config.debug)
559 cls.tempdir = cls.get_tempdir()
560 cls.create_file_handler()
561 cls.file_handler.setFormatter(
562 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
564 cls.file_handler.setLevel(DEBUG)
565 cls.logger.addHandler(cls.file_handler)
566 cls.logger.debug("--- setUpClass() for %s called ---" % cls.__name__)
567 os.chdir(cls.tempdir)
568 cls.logger.info("Temporary dir is %s, api socket is %s",
569 cls.tempdir, cls.get_api_sock_path())
570 cls.logger.debug("Random seed is %s", config.rnd_seed)
572 cls.reset_packet_infos()
577 cls.registry = VppObjectRegistry()
578 cls.vpp_startup_failed = False
579 cls.reporter = KeepAliveReporter()
580 # need to catch exceptions here because if we raise, then the cleanup
581 # doesn't get called and we might end with a zombie vpp
587 cls.reporter.send_keep_alive(cls, 'setUpClass')
588 VppTestResult.current_test_case_info = TestCaseInfo(
589 cls.logger, cls.tempdir, cls.vpp.pid, config.vpp)
590 cls.vpp_stdout_deque = deque()
591 cls.vpp_stderr_deque = deque()
592 if not cls.debug_attach:
593 cls.pump_thread_stop_flag = Event()
594 cls.pump_thread_wakeup_pipe = os.pipe()
595 cls.pump_thread = Thread(target=pump_output, args=(cls,))
596 cls.pump_thread.daemon = True
597 cls.pump_thread.start()
598 if cls.debug_gdb or cls.debug_gdbserver or cls.debug_attach:
599 cls.vapi_response_timeout = 0
600 cls.vapi = VppPapiProvider(cls.__name__, cls,
601 cls.vapi_response_timeout)
603 hook = hookmodule.StepHook(cls)
605 hook = hookmodule.PollHook(cls)
606 cls.vapi.register_hook(hook)
607 cls.statistics = VPPStats(socketname=cls.get_stats_sock_path())
611 cls.vpp_startup_failed = True
613 "VPP died shortly after startup, check the"
614 " output to standard error for possible cause")
618 except (vpp_papi.VPPIOError, Exception) as e:
619 cls.logger.debug("Exception connecting to vapi: %s" % e)
620 cls.vapi.disconnect()
622 if cls.debug_gdbserver:
623 print(colorize("You're running VPP inside gdbserver but "
624 "VPP-API connection failed, did you forget "
625 "to 'continue' VPP from within gdb?", RED))
628 last_line = cls.vapi.cli("show thread").split("\n")[-2]
629 cls.vpp_worker_count = int(last_line.split(" ")[0])
630 print("Detected VPP with %s workers." % cls.vpp_worker_count)
631 except vpp_papi.VPPRuntimeError as e:
632 cls.logger.debug("%s" % e)
635 except Exception as e:
636 cls.logger.debug("Exception connecting to VPP: %s" % e)
641 def _debug_quit(cls):
642 if (cls.debug_gdbserver or cls.debug_gdb):
646 if cls.vpp.returncode is None:
648 print(double_line_delim)
649 print("VPP or GDB server is still running")
650 print(single_line_delim)
651 input("When done debugging, press ENTER to kill the "
652 "process and finish running the testcase...")
653 except AttributeError:
659 Disconnect vpp-api, kill vpp and cleanup shared memory files
663 # first signal that we want to stop the pump thread, then wake it up
664 if hasattr(cls, 'pump_thread_stop_flag'):
665 cls.pump_thread_stop_flag.set()
666 if hasattr(cls, 'pump_thread_wakeup_pipe'):
667 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
668 if hasattr(cls, 'pump_thread'):
669 cls.logger.debug("Waiting for pump thread to stop")
670 cls.pump_thread.join()
671 if hasattr(cls, 'vpp_stderr_reader_thread'):
672 cls.logger.debug("Waiting for stderr pump to stop")
673 cls.vpp_stderr_reader_thread.join()
675 if hasattr(cls, 'vpp'):
676 if hasattr(cls, 'vapi'):
677 cls.logger.debug(cls.vapi.vpp.get_stats())
678 cls.logger.debug("Disconnecting class vapi client on %s",
680 cls.vapi.disconnect()
681 cls.logger.debug("Deleting class vapi attribute on %s",
685 if not cls.debug_attach and cls.vpp.returncode is None:
686 cls.wait_for_coredump()
687 cls.logger.debug("Sending TERM to vpp")
689 cls.logger.debug("Waiting for vpp to die")
691 outs, errs = cls.vpp.communicate(timeout=5)
692 except subprocess.TimeoutExpired:
694 outs, errs = cls.vpp.communicate()
695 cls.logger.debug("Deleting class vpp attribute on %s",
697 if not cls.debug_attach:
698 cls.vpp.stdout.close()
699 cls.vpp.stderr.close()
702 if cls.vpp_startup_failed:
703 stdout_log = cls.logger.info
704 stderr_log = cls.logger.critical
706 stdout_log = cls.logger.info
707 stderr_log = cls.logger.info
709 if hasattr(cls, 'vpp_stdout_deque'):
710 stdout_log(single_line_delim)
711 stdout_log('VPP output to stdout while running %s:', cls.__name__)
712 stdout_log(single_line_delim)
713 vpp_output = "".join(cls.vpp_stdout_deque)
714 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
716 stdout_log('\n%s', vpp_output)
717 stdout_log(single_line_delim)
719 if hasattr(cls, 'vpp_stderr_deque'):
720 stderr_log(single_line_delim)
721 stderr_log('VPP output to stderr while running %s:', cls.__name__)
722 stderr_log(single_line_delim)
723 vpp_output = "".join(cls.vpp_stderr_deque)
724 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
726 stderr_log('\n%s', vpp_output)
727 stderr_log(single_line_delim)
730 def tearDownClass(cls):
731 """ Perform final cleanup after running all tests in this test-case """
732 cls.logger.debug("--- tearDownClass() for %s called ---" %
734 cls.reporter.send_keep_alive(cls, 'tearDownClass')
736 cls.file_handler.close()
737 cls.reset_packet_infos()
738 if config.debug_framework:
739 debug_internal.on_tear_down_class(cls)
741 def show_commands_at_teardown(self):
742 """ Allow subclass specific teardown logging additions."""
743 self.logger.info("--- No test specific show commands provided. ---")
746 """ Show various debug prints after each test """
747 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
748 (self.__class__.__name__, self._testMethodName,
749 self._testMethodDoc))
752 if not self.vpp_dead:
753 self.logger.debug(self.vapi.cli("show trace max 1000"))
754 self.logger.info(self.vapi.ppcli("show interface"))
755 self.logger.info(self.vapi.ppcli("show hardware"))
756 self.logger.info(self.statistics.set_errors_str())
757 self.logger.info(self.vapi.ppcli("show run"))
758 self.logger.info(self.vapi.ppcli("show log"))
759 self.logger.info(self.vapi.ppcli("show bihash"))
760 self.logger.info("Logging testcase specific show commands.")
761 self.show_commands_at_teardown()
762 if self.remove_configured_vpp_objects_on_tear_down:
763 self.registry.remove_vpp_config(self.logger)
764 # Save/Dump VPP api trace log
765 m = self._testMethodName
766 api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
767 tmp_api_trace = "/tmp/%s" % api_trace
768 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
769 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
770 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
772 os.rename(tmp_api_trace, vpp_api_trace_log)
773 except VppTransportSocketIOError:
774 self.logger.debug("VppTransportSocketIOError: Vpp dead. "
775 "Cannot log show commands.")
778 self.registry.unregister_all(self.logger)
781 """ Clear trace before running each test"""
782 super(VppTestCase, self).setUp()
783 self.reporter.send_keep_alive(self)
785 raise VppDiedError(rv=None, testcase=self.__class__.__name__,
786 method_name=self._testMethodName)
787 self.sleep(.1, "during setUp")
788 self.vpp_stdout_deque.append(
789 "--- test setUp() for %s.%s(%s) starts here ---\n" %
790 (self.__class__.__name__, self._testMethodName,
791 self._testMethodDoc))
792 self.vpp_stderr_deque.append(
793 "--- test setUp() for %s.%s(%s) starts here ---\n" %
794 (self.__class__.__name__, self._testMethodName,
795 self._testMethodDoc))
796 self.vapi.cli("clear trace")
797 # store the test instance inside the test class - so that objects
798 # holding the class can access instance methods (like assertEqual)
799 type(self).test_instance = self
802 def pg_enable_capture(cls, interfaces=None):
804 Enable capture on packet-generator interfaces
806 :param interfaces: iterable interface indexes (if None,
807 use self.pg_interfaces)
810 if interfaces is None:
811 interfaces = cls.pg_interfaces
816 def register_pcap(cls, intf, worker):
817 """ Register a pcap in the testclass """
818 # add to the list of captures with current timestamp
819 cls._pcaps.append((intf, worker))
822 def get_vpp_time(cls):
823 # processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT"
824 # returns float("2.190522")
825 timestr = cls.vapi.cli('show clock')
826 head, sep, tail = timestr.partition(',')
827 head, sep, tail = head.partition('Time now')
831 def sleep_on_vpp_time(cls, sec):
832 """ Sleep according to time in VPP world """
833 # On a busy system with many processes
834 # we might end up with VPP time being slower than real world
835 # So take that into account when waiting for VPP to do something
836 start_time = cls.get_vpp_time()
837 while cls.get_vpp_time() - start_time < sec:
841 def pg_start(cls, trace=True):
842 """ Enable the PG, wait till it is done, then clean up """
843 for (intf, worker) in cls._old_pcaps:
844 intf.handle_old_pcap_file(intf.get_in_path(worker),
845 intf.in_history_counter)
848 cls.vapi.cli("clear trace")
849 cls.vapi.cli("trace add pg-input 1000")
850 cls.vapi.cli('packet-generator enable')
851 # PG, when starts, runs to completion -
852 # so let's avoid a race condition,
853 # and wait a little till it's done.
854 # Then clean it up - and then be gone.
855 deadline = time.time() + 300
856 while cls.vapi.cli('show packet-generator').find("Yes") != -1:
857 cls.sleep(0.01) # yield
858 if time.time() > deadline:
859 cls.logger.error("Timeout waiting for pg to stop")
861 for intf, worker in cls._pcaps:
862 cls.vapi.cli('packet-generator delete %s' %
863 intf.get_cap_name(worker))
864 cls._old_pcaps = cls._pcaps
868 def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0,
871 Create packet-generator interfaces.
873 :param interfaces: iterable indexes of the interfaces.
874 :returns: List of created interfaces.
879 intf = VppPGInterface(cls, i, gso, gso_size, mode)
880 setattr(cls, intf.name, intf)
882 cls.pg_interfaces = result
886 def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
887 pgmode = VppEnum.vl_api_pg_interface_mode_t
888 return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
889 pgmode.PG_API_MODE_IP4)
892 def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
893 pgmode = VppEnum.vl_api_pg_interface_mode_t
894 return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
895 pgmode.PG_API_MODE_IP6)
898 def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
899 pgmode = VppEnum.vl_api_pg_interface_mode_t
900 return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
901 pgmode.PG_API_MODE_ETHERNET)
904 def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
905 pgmode = VppEnum.vl_api_pg_interface_mode_t
906 return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
907 pgmode.PG_API_MODE_ETHERNET)
910 def create_loopback_interfaces(cls, count):
912 Create loopback interfaces.
914 :param count: number of interfaces created.
915 :returns: List of created interfaces.
917 result = [VppLoInterface(cls) for i in range(count)]
919 setattr(cls, intf.name, intf)
920 cls.lo_interfaces = result
924 def create_bvi_interfaces(cls, count):
926 Create BVI interfaces.
928 :param count: number of interfaces created.
929 :returns: List of created interfaces.
931 result = [VppBviInterface(cls) for i in range(count)]
933 setattr(cls, intf.name, intf)
934 cls.bvi_interfaces = result
938 def extend_packet(packet, size, padding=' '):
940 Extend packet to given size by padding with spaces or custom padding
941 NOTE: Currently works only when Raw layer is present.
943 :param packet: packet
944 :param size: target size
945 :param padding: padding used to extend the payload
948 packet_len = len(packet) + 4
949 extend = size - packet_len
951 num = (extend // len(padding)) + 1
952 packet[Raw].load += (padding * num)[:extend].encode("ascii")
955 def reset_packet_infos(cls):
956 """ Reset the list of packet info objects and packet counts to zero """
957 cls._packet_infos = {}
958 cls._packet_count_for_dst_if_idx = {}
961 def create_packet_info(cls, src_if, dst_if):
963 Create packet info object containing the source and destination indexes
964 and add it to the testcase's packet info list
966 :param VppInterface src_if: source interface
967 :param VppInterface dst_if: destination interface
969 :returns: _PacketInfo object
973 info.index = len(cls._packet_infos)
974 info.src = src_if.sw_if_index
975 info.dst = dst_if.sw_if_index
976 if isinstance(dst_if, VppSubInterface):
977 dst_idx = dst_if.parent.sw_if_index
979 dst_idx = dst_if.sw_if_index
980 if dst_idx in cls._packet_count_for_dst_if_idx:
981 cls._packet_count_for_dst_if_idx[dst_idx] += 1
983 cls._packet_count_for_dst_if_idx[dst_idx] = 1
984 cls._packet_infos[info.index] = info
988 def info_to_payload(info):
990 Convert _PacketInfo object to packet payload
992 :param info: _PacketInfo object
994 :returns: string containing serialized data from packet info
997 # retrieve payload, currently 18 bytes (4 x ints + 1 short)
998 return pack('iiiih', info.index, info.src,
999 info.dst, info.ip, info.proto)
1002 def payload_to_info(payload, payload_field='load'):
1004 Convert packet payload to _PacketInfo object
1006 :param payload: packet payload
1007 :type payload: <class 'scapy.packet.Raw'>
1008 :param payload_field: packet fieldname of payload "load" for
1009 <class 'scapy.packet.Raw'>
1010 :type payload_field: str
1011 :returns: _PacketInfo object containing de-serialized data from payload
1015 # retrieve payload, currently 18 bytes (4 x ints + 1 short)
1016 payload_b = getattr(payload, payload_field)[:18]
1018 info = _PacketInfo()
1019 info.index, info.src, info.dst, info.ip, info.proto \
1020 = unpack('iiiih', payload_b)
1022 # some SRv6 TCs depend on get an exception if bad values are detected
1023 if info.index > 0x4000:
1024 raise ValueError('Index value is invalid')
1028 def get_next_packet_info(self, info):
1030 Iterate over the packet info list stored in the testcase
1031 Start iteration with first element if info is None
1032 Continue based on index in info if info is specified
1034 :param info: info or None
1035 :returns: next info in list or None if no more infos
1040 next_index = info.index + 1
1041 if next_index == len(self._packet_infos):
1044 return self._packet_infos[next_index]
1046 def get_next_packet_info_for_interface(self, src_index, info):
1048 Search the packet info list for the next packet info with same source
1051 :param src_index: source interface index to search for
1052 :param info: packet info - where to start the search
1053 :returns: packet info or None
1057 info = self.get_next_packet_info(info)
1060 if info.src == src_index:
1063 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
1065 Search the packet info list for the next packet info with same source
1066 and destination interface indexes
1068 :param src_index: source interface index to search for
1069 :param dst_index: destination interface index to search for
1070 :param info: packet info - where to start the search
1071 :returns: packet info or None
1075 info = self.get_next_packet_info_for_interface(src_index, info)
1078 if info.dst == dst_index:
1081 def assert_equal(self, real_value, expected_value, name_or_class=None):
1082 if name_or_class is None:
1083 self.assertEqual(real_value, expected_value)
1086 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
1087 msg = msg % (getdoc(name_or_class).strip(),
1088 real_value, str(name_or_class(real_value)),
1089 expected_value, str(name_or_class(expected_value)))
1091 msg = "Invalid %s: %s does not match expected value %s" % (
1092 name_or_class, real_value, expected_value)
1094 self.assertEqual(real_value, expected_value, msg)
1096 def assert_in_range(self,
1104 msg = "Invalid %s: %s out of range <%s,%s>" % (
1105 name, real_value, expected_min, expected_max)
1106 self.assertTrue(expected_min <= real_value <= expected_max, msg)
1108 def assert_packet_checksums_valid(self, packet,
1109 ignore_zero_udp_checksums=True):
1110 received = packet.__class__(scapy.compat.raw(packet))
1111 udp_layers = ['UDP', 'UDPerror']
1112 checksum_fields = ['cksum', 'chksum']
1115 temp = received.__class__(scapy.compat.raw(received))
1117 layer = temp.getlayer(counter)
1119 layer = layer.copy()
1120 layer.remove_payload()
1121 for cf in checksum_fields:
1122 if hasattr(layer, cf):
1123 if ignore_zero_udp_checksums and \
1124 0 == getattr(layer, cf) and \
1125 layer.name in udp_layers:
1127 delattr(temp.getlayer(counter), cf)
1128 checksums.append((counter, cf))
1131 counter = counter + 1
1132 if 0 == len(checksums):
1134 temp = temp.__class__(scapy.compat.raw(temp))
1135 for layer, cf in checksums:
1136 calc_sum = getattr(temp[layer], cf)
1138 getattr(received[layer], cf), calc_sum,
1139 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
1141 "Checksum field `%s` on `%s` layer has correct value `%s`" %
1142 (cf, temp[layer].name, calc_sum))
1144 def assert_checksum_valid(self, received_packet, layer,
1145 field_name='chksum',
1146 ignore_zero_checksum=False):
1147 """ Check checksum of received packet on given layer """
1148 received_packet_checksum = getattr(received_packet[layer], field_name)
1149 if ignore_zero_checksum and 0 == received_packet_checksum:
1151 recalculated = received_packet.__class__(
1152 scapy.compat.raw(received_packet))
1153 delattr(recalculated[layer], field_name)
1154 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1155 self.assert_equal(received_packet_checksum,
1156 getattr(recalculated[layer], field_name),
1157 "packet checksum on layer: %s" % layer)
1159 def assert_ip_checksum_valid(self, received_packet,
1160 ignore_zero_checksum=False):
1161 self.assert_checksum_valid(received_packet, 'IP',
1162 ignore_zero_checksum=ignore_zero_checksum)
1164 def assert_tcp_checksum_valid(self, received_packet,
1165 ignore_zero_checksum=False):
1166 self.assert_checksum_valid(received_packet, 'TCP',
1167 ignore_zero_checksum=ignore_zero_checksum)
1169 def assert_udp_checksum_valid(self, received_packet,
1170 ignore_zero_checksum=True):
1171 self.assert_checksum_valid(received_packet, 'UDP',
1172 ignore_zero_checksum=ignore_zero_checksum)
1174 def assert_embedded_icmp_checksum_valid(self, received_packet):
1175 if received_packet.haslayer(IPerror):
1176 self.assert_checksum_valid(received_packet, 'IPerror')
1177 if received_packet.haslayer(TCPerror):
1178 self.assert_checksum_valid(received_packet, 'TCPerror')
1179 if received_packet.haslayer(UDPerror):
1180 self.assert_checksum_valid(received_packet, 'UDPerror',
1181 ignore_zero_checksum=True)
1182 if received_packet.haslayer(ICMPerror):
1183 self.assert_checksum_valid(received_packet, 'ICMPerror')
1185 def assert_icmp_checksum_valid(self, received_packet):
1186 self.assert_checksum_valid(received_packet, 'ICMP')
1187 self.assert_embedded_icmp_checksum_valid(received_packet)
1189 def assert_icmpv6_checksum_valid(self, pkt):
1190 if pkt.haslayer(ICMPv6DestUnreach):
1191 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
1192 self.assert_embedded_icmp_checksum_valid(pkt)
1193 if pkt.haslayer(ICMPv6EchoRequest):
1194 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
1195 if pkt.haslayer(ICMPv6EchoReply):
1196 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
1198 def get_counter(self, counter):
1199 if counter.startswith("/"):
1200 counter_value = self.statistics.get_counter(counter)
1202 counters = self.vapi.cli("sh errors").split('\n')
1204 for i in range(1, len(counters) - 1):
1205 results = counters[i].split()
1206 if results[1] == counter:
1207 counter_value = int(results[0])
1209 return counter_value
1211 def assert_counter_equal(self, counter, expected_value,
1212 thread=None, index=0):
1213 c = self.get_counter(counter)
1214 if thread is not None:
1215 c = c[thread][index]
1217 c = sum(x[index] for x in c)
1218 self.assert_equal(c, expected_value, "counter `%s'" % counter)
1220 def assert_packet_counter_equal(self, counter, expected_value):
1221 counter_value = self.get_counter(counter)
1222 self.assert_equal(counter_value, expected_value,
1223 "packet counter `%s'" % counter)
1225 def assert_error_counter_equal(self, counter, expected_value):
1226 counter_value = self.statistics[counter].sum()
1227 self.assert_equal(counter_value, expected_value,
1228 "error counter `%s'" % counter)
1231 def sleep(cls, timeout, remark=None):
1233 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1234 # * by Guido, only the main thread can be interrupted.
1236 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1239 if hasattr(os, 'sched_yield'):
1245 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1246 before = time.time()
1249 if after - before > 2 * timeout:
1250 cls.logger.error("unexpected self.sleep() result - "
1251 "slept for %es instead of ~%es!",
1252 after - before, timeout)
1255 "Finished sleep (%s) - slept %es (wanted %es)",
1256 remark, after - before, timeout)
1258 def virtual_sleep(self, timeout, remark=None):
1259 self.logger.debug("Moving VPP time by %s (%s)", timeout, remark)
1260 self.vapi.cli("set clock adjust %s" % timeout)
1262 def pg_send(self, intf, pkts, worker=None, trace=True):
1263 intf.add_stream(pkts, worker=worker)
1264 self.pg_enable_capture(self.pg_interfaces)
1265 self.pg_start(trace=trace)
1267 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None,
1269 self.pg_send(intf, pkts)
1272 for i in self.pg_interfaces:
1273 i.get_capture(0, timeout=timeout)
1274 i.assert_nothing_captured(remark=remark)
1277 self.logger.debug(self.vapi.cli("show trace"))
1279 def send_and_expect(self, intf, pkts, output, n_rx=None, worker=None,
1282 n_rx = 1 if isinstance(pkts, Packet) else len(pkts)
1283 self.pg_send(intf, pkts, worker=worker, trace=trace)
1284 rx = output.get_capture(n_rx)
1286 self.logger.debug(self.vapi.cli("show trace"))
1289 def send_and_expect_load_balancing(self, input, pkts, outputs,
1290 worker=None, trace=True):
1291 self.pg_send(input, pkts, worker=worker, trace=trace)
1294 rx = oo._get_capture(1)
1295 self.assertNotEqual(0, len(rx))
1298 self.logger.debug(self.vapi.cli("show trace"))
1301 def send_and_expect_only(self, intf, pkts, output, timeout=None):
1302 self.pg_send(intf, pkts)
1303 rx = output.get_capture(len(pkts))
1307 for i in self.pg_interfaces:
1308 if i not in outputs:
1309 i.get_capture(0, timeout=timeout)
1310 i.assert_nothing_captured()
1316 def get_testcase_doc_name(test):
1317 return getdoc(test.__class__).splitlines()[0]
1320 def get_test_description(descriptions, test):
1321 short_description = test.shortDescription()
1322 if descriptions and short_description:
1323 return short_description
1328 class TestCaseInfo(object):
1329 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1330 self.logger = logger
1331 self.tempdir = tempdir
1332 self.vpp_pid = vpp_pid
1333 self.vpp_bin_path = vpp_bin_path
1334 self.core_crash_test = None
1337 class VppTestResult(unittest.TestResult):
1339 @property result_string
1340 String variable to store the test case result string.
1342 List variable containing 2-tuples of TestCase instances and strings
1343 holding formatted tracebacks. Each tuple represents a test which
1344 raised an unexpected exception.
1346 List variable containing 2-tuples of TestCase instances and strings
1347 holding formatted tracebacks. Each tuple represents a test where
1348 a failure was explicitly signalled using the TestCase.assert*()
1352 failed_test_cases_info = set()
1353 core_crash_test_cases_info = set()
1354 current_test_case_info = None
1356 def __init__(self, stream=None, descriptions=None, verbosity=None,
1359 :param stream File descriptor to store where to report test results.
1360 Set to the standard error stream by default.
1361 :param descriptions Boolean variable to store information if to use
1362 test case descriptions.
1363 :param verbosity Integer variable to store required verbosity level.
1365 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1366 self.stream = stream
1367 self.descriptions = descriptions
1368 self.verbosity = verbosity
1369 self.result_string = None
1370 self.runner = runner
1373 def addSuccess(self, test):
1375 Record a test succeeded result
1380 if self.current_test_case_info:
1381 self.current_test_case_info.logger.debug(
1382 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1383 test._testMethodName,
1384 test._testMethodDoc))
1385 unittest.TestResult.addSuccess(self, test)
1386 self.result_string = colorize("OK", GREEN)
1388 self.send_result_through_pipe(test, PASS)
1390 def addSkip(self, test, reason):
1392 Record a test skipped.
1398 if self.current_test_case_info:
1399 self.current_test_case_info.logger.debug(
1400 "--- addSkip() %s.%s(%s) called, reason is %s" %
1401 (test.__class__.__name__, test._testMethodName,
1402 test._testMethodDoc, reason))
1403 unittest.TestResult.addSkip(self, test, reason)
1404 self.result_string = colorize("SKIP", YELLOW)
1406 if reason == "not enough cpus":
1407 self.send_result_through_pipe(test, SKIP_CPU_SHORTAGE)
1409 self.send_result_through_pipe(test, SKIP)
1411 def symlink_failed(self):
1412 if self.current_test_case_info:
1414 failed_dir = config.failed_dir
1415 link_path = os.path.join(
1418 os.path.basename(self.current_test_case_info.tempdir))
1420 self.current_test_case_info.logger.debug(
1421 "creating a link to the failed test")
1422 self.current_test_case_info.logger.debug(
1423 "os.symlink(%s, %s)" %
1424 (self.current_test_case_info.tempdir, link_path))
1425 if os.path.exists(link_path):
1426 self.current_test_case_info.logger.debug(
1427 'symlink already exists')
1429 os.symlink(self.current_test_case_info.tempdir, link_path)
1431 except Exception as e:
1432 self.current_test_case_info.logger.error(e)
1434 def send_result_through_pipe(self, test, result):
1435 if hasattr(self, 'test_framework_result_pipe'):
1436 pipe = self.test_framework_result_pipe
1438 pipe.send((test.id(), result))
1440 def log_error(self, test, err, fn_name):
1441 if self.current_test_case_info:
1442 if isinstance(test, unittest.suite._ErrorHolder):
1443 test_name = test.description
1445 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1446 test._testMethodName,
1447 test._testMethodDoc)
1448 self.current_test_case_info.logger.debug(
1449 "--- %s() %s called, err is %s" %
1450 (fn_name, test_name, err))
1451 self.current_test_case_info.logger.debug(
1452 "formatted exception is:\n%s" %
1453 "".join(format_exception(*err)))
1455 def add_error(self, test, err, unittest_fn, error_type):
1456 if error_type == FAIL:
1457 self.log_error(test, err, 'addFailure')
1458 error_type_str = colorize("FAIL", RED)
1459 elif error_type == ERROR:
1460 self.log_error(test, err, 'addError')
1461 error_type_str = colorize("ERROR", RED)
1463 raise Exception('Error type %s cannot be used to record an '
1464 'error or a failure' % error_type)
1466 unittest_fn(self, test, err)
1467 if self.current_test_case_info:
1468 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1470 self.current_test_case_info.tempdir)
1471 self.symlink_failed()
1472 self.failed_test_cases_info.add(self.current_test_case_info)
1473 if is_core_present(self.current_test_case_info.tempdir):
1474 if not self.current_test_case_info.core_crash_test:
1475 if isinstance(test, unittest.suite._ErrorHolder):
1476 test_name = str(test)
1478 test_name = "'{!s}' ({!s})".format(
1479 get_testcase_doc_name(test), test.id())
1480 self.current_test_case_info.core_crash_test = test_name
1481 self.core_crash_test_cases_info.add(
1482 self.current_test_case_info)
1484 self.result_string = '%s [no temp dir]' % error_type_str
1486 self.send_result_through_pipe(test, error_type)
1488 def addFailure(self, test, err):
1490 Record a test failed result
1493 :param err: error message
1496 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1498 def addError(self, test, err):
1500 Record a test error result
1503 :param err: error message
1506 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1508 def getDescription(self, test):
1510 Get test description
1513 :returns: test description
1516 return get_test_description(self.descriptions, test)
1518 def startTest(self, test):
1526 def print_header(test):
1527 if test.__class__ in self.printed:
1530 test_doc = getdoc(test)
1532 raise Exception("No doc string for test '%s'" % test.id())
1534 test_title = test_doc.splitlines()[0].rstrip()
1535 test_title = colorize(test_title, GREEN)
1536 if test.is_tagged_run_solo():
1537 test_title = colorize(f"SOLO RUN: {test_title}", YELLOW)
1539 # This block may overwrite the colorized title above,
1540 # but we want this to stand out and be fixed
1541 if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
1542 test_title = colorize(
1543 f"FIXME with VPP workers: {test_title}", RED)
1545 if test.has_tag(TestCaseTag.FIXME_ASAN):
1546 test_title = colorize(
1547 f"FIXME with ASAN: {test_title}", RED)
1548 test.skip_fixme_asan()
1550 if hasattr(test, 'vpp_worker_count'):
1551 if test.vpp_worker_count == 0:
1552 test_title += " [main thread only]"
1553 elif test.vpp_worker_count == 1:
1554 test_title += " [1 worker thread]"
1556 test_title += f" [{test.vpp_worker_count} worker threads]"
1558 if test.__class__.skipped_due_to_cpu_lack:
1559 test_title = colorize(
1560 f"{test_title} [skipped - not enough cpus, "
1561 f"required={test.__class__.get_cpus_required()}, "
1562 f"available={max_vpp_cpus}]", YELLOW)
1564 print(double_line_delim)
1566 print(double_line_delim)
1567 self.printed.append(test.__class__)
1570 self.start_test = time.time()
1571 unittest.TestResult.startTest(self, test)
1572 if self.verbosity > 0:
1573 self.stream.writeln(
1574 "Starting " + self.getDescription(test) + " ...")
1575 self.stream.writeln(single_line_delim)
1577 def stopTest(self, test):
1579 Called when the given test has been run
1584 unittest.TestResult.stopTest(self, test)
1586 if self.verbosity > 0:
1587 self.stream.writeln(single_line_delim)
1588 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1589 self.result_string))
1590 self.stream.writeln(single_line_delim)
1592 self.stream.writeln("%-68s %4.2f %s" %
1593 (self.getDescription(test),
1594 time.time() - self.start_test,
1595 self.result_string))
1597 self.send_result_through_pipe(test, TEST_RUN)
1599 def printErrors(self):
1601 Print errors from running the test case
1603 if len(self.errors) > 0 or len(self.failures) > 0:
1604 self.stream.writeln()
1605 self.printErrorList('ERROR', self.errors)
1606 self.printErrorList('FAIL', self.failures)
1608 # ^^ that is the last output from unittest before summary
1609 if not self.runner.print_summary:
1610 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1611 self.stream = devnull
1612 self.runner.stream = devnull
1614 def printErrorList(self, flavour, errors):
1616 Print error list to the output stream together with error type
1617 and test case description.
1619 :param flavour: error type
1620 :param errors: iterable errors
1623 for test, err in errors:
1624 self.stream.writeln(double_line_delim)
1625 self.stream.writeln("%s: %s" %
1626 (flavour, self.getDescription(test)))
1627 self.stream.writeln(single_line_delim)
1628 self.stream.writeln("%s" % err)
1631 class VppTestRunner(unittest.TextTestRunner):
1633 A basic test runner implementation which prints results to standard error.
1637 def resultclass(self):
1638 """Class maintaining the results of the tests"""
1639 return VppTestResult
1641 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1642 result_pipe=None, failfast=False, buffer=False,
1643 resultclass=None, print_summary=True, **kwargs):
1644 # ignore stream setting here, use hard-coded stdout to be in sync
1645 # with prints from VppTestCase methods ...
1646 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1647 verbosity, failfast, buffer,
1648 resultclass, **kwargs)
1649 KeepAliveReporter.pipe = keep_alive_pipe
1651 self.orig_stream = self.stream
1652 self.resultclass.test_framework_result_pipe = result_pipe
1654 self.print_summary = print_summary
1656 def _makeResult(self):
1657 return self.resultclass(self.stream,
1662 def run(self, test):
1669 faulthandler.enable() # emit stack trace to stderr if killed by signal
1671 result = super(VppTestRunner, self).run(test)
1672 if not self.print_summary:
1673 self.stream = self.orig_stream
1674 result.stream = self.orig_stream
1678 class Worker(Thread):
1679 def __init__(self, executable_args, logger, env=None, *args, **kwargs):
1680 super(Worker, self).__init__(*args, **kwargs)
1681 self.logger = logger
1682 self.args = executable_args
1683 if hasattr(self, 'testcase') and self.testcase.debug_all:
1684 if self.testcase.debug_gdbserver:
1685 self.args = ['/usr/bin/gdbserver', 'localhost:{port}'
1686 .format(port=self.testcase.gdbserver_port)] + args
1687 elif self.testcase.debug_gdb and hasattr(self, 'wait_for_gdb'):
1688 self.args.append(self.wait_for_gdb)
1689 self.app_bin = executable_args[0]
1690 self.app_name = os.path.basename(self.app_bin)
1691 if hasattr(self, 'role'):
1692 self.app_name += ' {role}'.format(role=self.role)
1695 env = {} if env is None else env
1696 self.env = copy.deepcopy(env)
1698 def wait_for_enter(self):
1699 if not hasattr(self, 'testcase'):
1701 if self.testcase.debug_all and self.testcase.debug_gdbserver:
1703 print(double_line_delim)
1704 print("Spawned GDB Server for '{app}' with PID: {pid}"
1705 .format(app=self.app_name, pid=self.process.pid))
1706 elif self.testcase.debug_all and self.testcase.debug_gdb:
1708 print(double_line_delim)
1709 print("Spawned '{app}' with PID: {pid}"
1710 .format(app=self.app_name, pid=self.process.pid))
1713 print(single_line_delim)
1714 print("You can debug '{app}' using:".format(app=self.app_name))
1715 if self.testcase.debug_gdbserver:
1716 print("sudo gdb " + self.app_bin +
1717 " -ex 'target remote localhost:{port}'"
1718 .format(port=self.testcase.gdbserver_port))
1719 print("Now is the time to attach gdb by running the above "
1720 "command, set up breakpoints etc., then resume from "
1721 "within gdb by issuing the 'continue' command")
1722 self.testcase.gdbserver_port += 1
1723 elif self.testcase.debug_gdb:
1724 print("sudo gdb " + self.app_bin +
1725 " -ex 'attach {pid}'".format(pid=self.process.pid))
1726 print("Now is the time to attach gdb by running the above "
1727 "command and set up breakpoints etc., then resume from"
1728 " within gdb by issuing the 'continue' command")
1729 print(single_line_delim)
1730 input("Press ENTER to continue running the testcase...")
1733 executable = self.args[0]
1734 if not os.path.exists(executable) or not os.access(
1735 executable, os.F_OK | os.X_OK):
1736 # Exit code that means some system file did not exist,
1737 # could not be opened, or had some other kind of error.
1738 self.result = os.EX_OSFILE
1739 raise EnvironmentError(
1740 "executable '%s' is not found or executable." % executable)
1741 self.logger.debug("Running executable '{app}': '{cmd}'"
1742 .format(app=self.app_name,
1743 cmd=' '.join(self.args)))
1744 env = os.environ.copy()
1745 env.update(self.env)
1746 env["CK_LOG_FILE_NAME"] = "-"
1747 self.process = subprocess.Popen(
1748 ['stdbuf', '-o0', '-e0'] + self.args, shell=False, env=env,
1749 preexec_fn=os.setpgrp, stdout=subprocess.PIPE,
1750 stderr=subprocess.PIPE)
1751 self.wait_for_enter()
1752 out, err = self.process.communicate()
1753 self.logger.debug("Finished running `{app}'".format(app=self.app_name))
1754 self.logger.info("Return code is `%s'" % self.process.returncode)
1755 self.logger.info(single_line_delim)
1756 self.logger.info("Executable `{app}' wrote to stdout:"
1757 .format(app=self.app_name))
1758 self.logger.info(single_line_delim)
1759 self.logger.info(out.decode('utf-8'))
1760 self.logger.info(single_line_delim)
1761 self.logger.info("Executable `{app}' wrote to stderr:"
1762 .format(app=self.app_name))
1763 self.logger.info(single_line_delim)
1764 self.logger.info(err.decode('utf-8'))
1765 self.logger.info(single_line_delim)
1766 self.result = self.process.returncode
1769 if __name__ == '__main__':