3 from __future__ import print_function
19 from collections import deque
20 from threading import Thread, Event
21 from inspect import getdoc, isclass
22 from traceback import format_exception
23 from logging import FileHandler, DEBUG, Formatter
27 from scapy.packet import Raw
28 import hook as hookmodule
29 from vpp_pg_interface import VppPGInterface
30 from vpp_sub_interface import VppSubInterface
31 from vpp_lo_interface import VppLoInterface
32 from vpp_bvi_interface import VppBviInterface
33 from vpp_papi_provider import VppPapiProvider
35 from vpp_papi.vpp_stats import VPPStats
36 from vpp_papi.vpp_transport_shmem import VppTransportShmemIOError
37 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
39 from vpp_object import VppObjectRegistry
40 from util import ppp, is_core_present
41 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
42 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
43 from scapy.layers.inet6 import ICMPv6EchoReply
45 logger = logging.getLogger(__name__)
47 # Set up an empty logger for the testcase that can be overridden as necessary
48 null_logger = logging.getLogger('VppTestCase')
49 null_logger.addHandler(logging.NullHandler())
58 class BoolEnvironmentVariable(object):
60 def __init__(self, env_var_name, default='n', true_values=None):
61 self.name = env_var_name
62 self.default = default
63 self.true_values = true_values if true_values is not None else \
67 return os.getenv(self.name, self.default).lower() in self.true_values
69 if sys.version_info[0] == 2:
70 __nonzero__ = __bool__
73 return 'BoolEnvironmentVariable(%r, default=%r, true_values=%r)' % \
74 (self.name, self.default, self.true_values)
77 debug_framework = BoolEnvironmentVariable('TEST_DEBUG')
82 Test framework module.
84 The module provides a set of tools for constructing and running tests and
85 representing the results.
89 class VppDiedError(Exception):
90 """ exception for reporting that the subprocess has died."""
92 signals_by_value = {v: k for k, v in signal.__dict__.items() if
93 k.startswith('SIG') and not k.startswith('SIG_')}
95 def __init__(self, rv=None, testcase=None, method_name=None):
97 self.signal_name = None
98 self.testcase = testcase
99 self.method_name = method_name
102 self.signal_name = VppDiedError.signals_by_value[-rv]
103 except (KeyError, TypeError):
106 if testcase is None and method_name is None:
109 in_msg = ' while running %s.%s' % (testcase, method_name)
112 msg = "VPP subprocess died unexpectedly%s with return code: %d%s."\
113 % (in_msg, self.rv, ' [%s]' %
115 self.signal_name is not None else ''))
117 msg = "VPP subprocess died unexpectedly%s." % in_msg
119 super(VppDiedError, self).__init__(msg)
122 class _PacketInfo(object):
123 """Private class to create packet info object.
125 Help process information about the next packet.
126 Set variables to default values.
128 #: Store the index of the packet.
130 #: Store the index of the source packet generator interface of the packet.
132 #: Store the index of the destination packet generator interface
135 #: Store expected ip version
137 #: Store expected upper protocol
139 #: Store the copy of the former packet.
142 def __eq__(self, other):
143 index = self.index == other.index
144 src = self.src == other.src
145 dst = self.dst == other.dst
146 data = self.data == other.data
147 return index and src and dst and data
150 def pump_output(testclass):
151 """ pump output from vpp stdout/stderr to proper queues """
154 while not testclass.pump_thread_stop_flag.is_set():
155 readable = select.select([testclass.vpp.stdout.fileno(),
156 testclass.vpp.stderr.fileno(),
157 testclass.pump_thread_wakeup_pipe[0]],
159 if testclass.vpp.stdout.fileno() in readable:
160 read = os.read(testclass.vpp.stdout.fileno(), 102400)
162 split = read.decode('ascii',
163 errors='backslashreplace').splitlines(True)
164 if len(stdout_fragment) > 0:
165 split[0] = "%s%s" % (stdout_fragment, split[0])
166 if len(split) > 0 and split[-1].endswith("\n"):
170 stdout_fragment = split[-1]
171 testclass.vpp_stdout_deque.extend(split[:limit])
172 if not testclass.cache_vpp_output:
173 for line in split[:limit]:
174 testclass.logger.info(
175 "VPP STDOUT: %s" % line.rstrip("\n"))
176 if testclass.vpp.stderr.fileno() in readable:
177 read = os.read(testclass.vpp.stderr.fileno(), 102400)
179 split = read.decode('ascii',
180 errors='backslashreplace').splitlines(True)
181 if len(stderr_fragment) > 0:
182 split[0] = "%s%s" % (stderr_fragment, split[0])
183 if len(split) > 0 and split[-1].endswith("\n"):
187 stderr_fragment = split[-1]
189 testclass.vpp_stderr_deque.extend(split[:limit])
190 if not testclass.cache_vpp_output:
191 for line in split[:limit]:
192 testclass.logger.error(
193 "VPP STDERR: %s" % line.rstrip("\n"))
194 # ignoring the dummy pipe here intentionally - the
195 # flag will take care of properly terminating the loop
198 def _is_skip_aarch64_set():
199 return BoolEnvironmentVariable('SKIP_AARCH64')
202 is_skip_aarch64_set = _is_skip_aarch64_set()
205 def _is_platform_aarch64():
206 return platform.machine() == 'aarch64'
209 is_platform_aarch64 = _is_platform_aarch64()
212 def _running_extended_tests():
213 return BoolEnvironmentVariable("EXTENDED_TESTS")
216 running_extended_tests = _running_extended_tests()
219 def _running_gcov_tests():
220 return BoolEnvironmentVariable("GCOV_TESTS")
223 running_gcov_tests = _running_gcov_tests()
226 class KeepAliveReporter(object):
228 Singleton object which reports test start to parent process
233 self.__dict__ = self._shared_state
241 def pipe(self, pipe):
242 if self._pipe is not None:
243 raise Exception("Internal error - pipe should only be set once.")
246 def send_keep_alive(self, test, desc=None):
248 Write current test tmpdir & desc to keep-alive pipe to signal liveness
250 if self.pipe is None:
251 # if not running forked..
255 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
259 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
262 class TestCaseTag(Enum):
263 # marks the suites that must run at the end
264 # using only a single test runner
266 # marks the suites broken on VPP multi-worker
267 FIXME_VPP_WORKERS = 2
270 def create_tag_decorator(e):
273 cls.test_tags.append(e)
274 except AttributeError:
280 tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
281 tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
284 class VppTestCase(unittest.TestCase):
285 """This subclass is a base class for VPP test cases that are implemented as
286 classes. It provides methods to create and run test case.
289 extra_vpp_punt_config = []
290 extra_vpp_plugin_config = []
292 vapi_response_timeout = 5
295 def packet_infos(self):
296 """List of packet infos"""
297 return self._packet_infos
300 def get_packet_count_for_if_idx(cls, dst_if_index):
301 """Get the number of packet info for specified destination if index"""
302 if dst_if_index in cls._packet_count_for_dst_if_idx:
303 return cls._packet_count_for_dst_if_idx[dst_if_index]
308 def has_tag(cls, tag):
309 """ if the test case has a given tag - return true """
311 return tag in cls.test_tags
312 except AttributeError:
317 def is_tagged_run_solo(cls):
318 """ if the test case class is timing-sensitive - return true """
319 return cls.has_tag(TestCaseTag.RUN_SOLO)
323 """Return the instance of this testcase"""
324 return cls.test_instance
327 def set_debug_flags(cls, d):
328 cls.gdbserver_port = 7777
329 cls.debug_core = False
330 cls.debug_gdb = False
331 cls.debug_gdbserver = False
332 cls.debug_all = False
337 cls.debug_core = True
338 elif dl == "gdb" or dl == "gdb-all":
340 elif dl == "gdbserver" or dl == "gdbserver-all":
341 cls.debug_gdbserver = True
343 raise Exception("Unrecognized DEBUG option: '%s'" % d)
344 if dl == "gdb-all" or dl == "gdbserver-all":
348 def get_least_used_cpu():
349 cpu_usage_list = [set(range(psutil.cpu_count()))]
350 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
351 if 'vpp_main' == p.info['name']]
352 for vpp_process in vpp_processes:
353 for cpu_usage_set in cpu_usage_list:
355 cpu_num = vpp_process.cpu_num()
356 if cpu_num in cpu_usage_set:
357 cpu_usage_set_index = cpu_usage_list.index(
359 if cpu_usage_set_index == len(cpu_usage_list) - 1:
360 cpu_usage_list.append({cpu_num})
362 cpu_usage_list[cpu_usage_set_index + 1].add(
364 cpu_usage_set.remove(cpu_num)
366 except psutil.NoSuchProcess:
369 for cpu_usage_set in cpu_usage_list:
370 if len(cpu_usage_set) > 0:
371 min_usage_set = cpu_usage_set
374 return random.choice(tuple(min_usage_set))
377 def setUpConstants(cls):
378 """ Set-up the test case class based on environment variables """
379 cls.step = BoolEnvironmentVariable('STEP')
380 d = os.getenv("DEBUG", None)
381 # inverted case to handle '' == True
382 c = os.getenv("CACHE_OUTPUT", "1")
383 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
384 cls.set_debug_flags(d)
385 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
386 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
387 cls.test_plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
388 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
390 if cls.plugin_path is not None:
391 if cls.extern_plugin_path is not None:
392 plugin_path = "%s:%s" % (
393 cls.plugin_path, cls.extern_plugin_path)
395 plugin_path = cls.plugin_path
396 elif cls.extern_plugin_path is not None:
397 plugin_path = cls.extern_plugin_path
399 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
400 debug_cli = "cli-listen localhost:5002"
402 size = os.getenv("COREDUMP_SIZE")
404 coredump_size = "coredump-size %s" % size
405 if coredump_size is None:
406 coredump_size = "coredump-size unlimited"
408 cpu_core_number = cls.get_least_used_cpu()
409 if not hasattr(cls, "worker_config"):
410 cls.worker_config = os.getenv("VPP_WORKER_CONFIG", "")
411 if cls.worker_config != "":
412 if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
413 cls.worker_config = ""
415 default_variant = os.getenv("VARIANT")
416 if default_variant is not None:
417 default_variant = "defaults { %s 100 }" % default_variant
421 api_fuzzing = os.getenv("API_FUZZ")
422 if api_fuzzing is None:
425 cls.vpp_cmdline = [cls.vpp_bin, "unix",
426 "{", "nodaemon", debug_cli, "full-coredump",
427 coredump_size, "runtime-dir", cls.tempdir, "}",
428 "api-trace", "{", "on", "}", "api-segment", "{",
429 "prefix", cls.shm_prefix, "}", "cpu", "{",
430 "main-core", str(cpu_core_number),
431 cls.worker_config, "}",
432 "physmem", "{", "max-size", "32m", "}",
433 "statseg", "{", "socket-name", cls.stats_sock, "}",
434 "socksvr", "{", "socket-name", cls.api_sock, "}",
435 "node { ", default_variant, "}",
436 "api-fuzz {", api_fuzzing, "}",
438 "{", "plugin", "dpdk_plugin.so", "{", "disable",
439 "}", "plugin", "rdma_plugin.so", "{", "disable",
440 "}", "plugin", "lisp_unittest_plugin.so", "{",
442 "}", "plugin", "unittest_plugin.so", "{", "enable",
443 "}"] + cls.extra_vpp_plugin_config + ["}", ]
445 if cls.extra_vpp_punt_config is not None:
446 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
447 if plugin_path is not None:
448 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
449 if cls.test_plugin_path is not None:
450 cls.vpp_cmdline.extend(["test_plugin_path", cls.test_plugin_path])
452 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
453 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
456 def wait_for_enter(cls):
457 if cls.debug_gdbserver:
458 print(double_line_delim)
459 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
461 print(double_line_delim)
462 print("Spawned VPP with PID: %d" % cls.vpp.pid)
464 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
466 print(single_line_delim)
467 print("You can debug VPP using:")
468 if cls.debug_gdbserver:
469 print("sudo gdb " + cls.vpp_bin +
470 " -ex 'target remote localhost:{port}'"
471 .format(port=cls.gdbserver_port))
472 print("Now is the time to attach gdb by running the above "
473 "command, set up breakpoints etc., then resume VPP from "
474 "within gdb by issuing the 'continue' command")
475 cls.gdbserver_port += 1
477 print("sudo gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
478 print("Now is the time to attach gdb by running the above "
479 "command and set up breakpoints etc., then resume VPP from"
480 " within gdb by issuing the 'continue' command")
481 print(single_line_delim)
482 input("Press ENTER to continue running the testcase...")
486 cmdline = cls.vpp_cmdline
488 if cls.debug_gdbserver:
489 gdbserver = '/usr/bin/gdbserver'
490 if not os.path.isfile(gdbserver) or \
491 not os.access(gdbserver, os.X_OK):
492 raise Exception("gdbserver binary '%s' does not exist or is "
493 "not executable" % gdbserver)
495 cmdline = [gdbserver, 'localhost:{port}'
496 .format(port=cls.gdbserver_port)] + cls.vpp_cmdline
497 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
500 cls.vpp = subprocess.Popen(cmdline,
501 stdout=subprocess.PIPE,
502 stderr=subprocess.PIPE)
503 except subprocess.CalledProcessError as e:
504 cls.logger.critical("Subprocess returned with non-0 return code: ("
508 cls.logger.critical("Subprocess returned with OS error: "
509 "(%s) %s", e.errno, e.strerror)
511 except Exception as e:
512 cls.logger.exception("Subprocess returned unexpected from "
519 def wait_for_coredump(cls):
520 corefile = cls.tempdir + "/core"
521 if os.path.isfile(corefile):
522 cls.logger.error("Waiting for coredump to complete: %s", corefile)
523 curr_size = os.path.getsize(corefile)
524 deadline = time.time() + 60
526 while time.time() < deadline:
529 curr_size = os.path.getsize(corefile)
530 if size == curr_size:
534 cls.logger.error("Timed out waiting for coredump to complete:"
537 cls.logger.error("Coredump complete: %s, size %d",
543 Perform class setup before running the testcase
544 Remove shared memory files, start vpp and connect the vpp-api
546 super(VppTestCase, cls).setUpClass()
547 gc.collect() # run garbage collection first
548 cls.logger = get_logger(cls.__name__)
549 seed = os.environ["RND_SEED"]
551 if hasattr(cls, 'parallel_handler'):
552 cls.logger.addHandler(cls.parallel_handler)
553 cls.logger.propagate = False
555 cls.tempdir = tempfile.mkdtemp(
556 prefix='vpp-unittest-%s-' % cls.__name__)
557 cls.stats_sock = "%s/stats.sock" % cls.tempdir
558 cls.api_sock = "%s/api.sock" % cls.tempdir
559 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
560 cls.file_handler.setFormatter(
561 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
563 cls.file_handler.setLevel(DEBUG)
564 cls.logger.addHandler(cls.file_handler)
565 cls.logger.debug("--- setUpClass() for %s called ---" %
567 cls.shm_prefix = os.path.basename(cls.tempdir)
568 os.chdir(cls.tempdir)
569 cls.logger.info("Temporary dir is %s, shm prefix is %s",
570 cls.tempdir, cls.shm_prefix)
571 cls.logger.debug("Random seed is %s" % seed)
573 cls.reset_packet_infos()
577 cls.registry = VppObjectRegistry()
578 cls.vpp_startup_failed = False
579 cls.reporter = KeepAliveReporter()
580 # need to catch exceptions here because if we raise, then the cleanup
581 # doesn't get called and we might end with a zombie vpp
584 cls.reporter.send_keep_alive(cls, 'setUpClass')
585 VppTestResult.current_test_case_info = TestCaseInfo(
586 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
587 cls.vpp_stdout_deque = deque()
588 cls.vpp_stderr_deque = deque()
589 cls.pump_thread_stop_flag = Event()
590 cls.pump_thread_wakeup_pipe = os.pipe()
591 cls.pump_thread = Thread(target=pump_output, args=(cls,))
592 cls.pump_thread.daemon = True
593 cls.pump_thread.start()
594 if cls.debug_gdb or cls.debug_gdbserver:
595 cls.vapi_response_timeout = 0
596 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
597 cls.vapi_response_timeout)
599 hook = hookmodule.StepHook(cls)
601 hook = hookmodule.PollHook(cls)
602 cls.vapi.register_hook(hook)
603 cls.statistics = VPPStats(socketname=cls.stats_sock)
607 cls.vpp_startup_failed = True
609 "VPP died shortly after startup, check the"
610 " output to standard error for possible cause")
614 except vpp_papi.VPPIOError as e:
615 cls.logger.debug("Exception connecting to vapi: %s" % e)
616 cls.vapi.disconnect()
618 if cls.debug_gdbserver:
619 print(colorize("You're running VPP inside gdbserver but "
620 "VPP-API connection failed, did you forget "
621 "to 'continue' VPP from within gdb?", RED))
623 except vpp_papi.VPPRuntimeError as e:
624 cls.logger.debug("%s" % e)
627 except Exception as e:
628 cls.logger.debug("Exception connecting to VPP: %s" % e)
633 def _debug_quit(cls):
634 if (cls.debug_gdbserver or cls.debug_gdb):
638 if cls.vpp.returncode is None:
640 print(double_line_delim)
641 print("VPP or GDB server is still running")
642 print(single_line_delim)
643 input("When done debugging, press ENTER to kill the "
644 "process and finish running the testcase...")
645 except AttributeError:
651 Disconnect vpp-api, kill vpp and cleanup shared memory files
655 # first signal that we want to stop the pump thread, then wake it up
656 if hasattr(cls, 'pump_thread_stop_flag'):
657 cls.pump_thread_stop_flag.set()
658 if hasattr(cls, 'pump_thread_wakeup_pipe'):
659 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
660 if hasattr(cls, 'pump_thread'):
661 cls.logger.debug("Waiting for pump thread to stop")
662 cls.pump_thread.join()
663 if hasattr(cls, 'vpp_stderr_reader_thread'):
664 cls.logger.debug("Waiting for stderr pump to stop")
665 cls.vpp_stderr_reader_thread.join()
667 if hasattr(cls, 'vpp'):
668 if hasattr(cls, 'vapi'):
669 cls.logger.debug(cls.vapi.vpp.get_stats())
670 cls.logger.debug("Disconnecting class vapi client on %s",
672 cls.vapi.disconnect()
673 cls.logger.debug("Deleting class vapi attribute on %s",
677 if cls.vpp.returncode is None:
678 cls.wait_for_coredump()
679 cls.logger.debug("Sending TERM to vpp")
681 cls.logger.debug("Waiting for vpp to die")
682 cls.vpp.communicate()
683 cls.logger.debug("Deleting class vpp attribute on %s",
685 cls.vpp.stdout.close()
686 cls.vpp.stderr.close()
689 if cls.vpp_startup_failed:
690 stdout_log = cls.logger.info
691 stderr_log = cls.logger.critical
693 stdout_log = cls.logger.info
694 stderr_log = cls.logger.info
696 if hasattr(cls, 'vpp_stdout_deque'):
697 stdout_log(single_line_delim)
698 stdout_log('VPP output to stdout while running %s:', cls.__name__)
699 stdout_log(single_line_delim)
700 vpp_output = "".join(cls.vpp_stdout_deque)
701 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
703 stdout_log('\n%s', vpp_output)
704 stdout_log(single_line_delim)
706 if hasattr(cls, 'vpp_stderr_deque'):
707 stderr_log(single_line_delim)
708 stderr_log('VPP output to stderr while running %s:', cls.__name__)
709 stderr_log(single_line_delim)
710 vpp_output = "".join(cls.vpp_stderr_deque)
711 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
713 stderr_log('\n%s', vpp_output)
714 stderr_log(single_line_delim)
717 def tearDownClass(cls):
718 """ Perform final cleanup after running all tests in this test-case """
719 cls.logger.debug("--- tearDownClass() for %s called ---" %
721 cls.reporter.send_keep_alive(cls, 'tearDownClass')
723 cls.file_handler.close()
724 cls.reset_packet_infos()
726 debug_internal.on_tear_down_class(cls)
728 def show_commands_at_teardown(self):
729 """ Allow subclass specific teardown logging additions."""
730 self.logger.info("--- No test specific show commands provided. ---")
733 """ Show various debug prints after each test """
734 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
735 (self.__class__.__name__, self._testMethodName,
736 self._testMethodDoc))
739 if not self.vpp_dead:
740 self.logger.debug(self.vapi.cli("show trace max 1000"))
741 self.logger.info(self.vapi.ppcli("show interface"))
742 self.logger.info(self.vapi.ppcli("show hardware"))
743 self.logger.info(self.statistics.set_errors_str())
744 self.logger.info(self.vapi.ppcli("show run"))
745 self.logger.info(self.vapi.ppcli("show log"))
746 self.logger.info(self.vapi.ppcli("show bihash"))
747 self.logger.info("Logging testcase specific show commands.")
748 self.show_commands_at_teardown()
749 self.registry.remove_vpp_config(self.logger)
750 # Save/Dump VPP api trace log
751 m = self._testMethodName
752 api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
753 tmp_api_trace = "/tmp/%s" % api_trace
754 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
755 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
756 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
758 os.rename(tmp_api_trace, vpp_api_trace_log)
759 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
761 except VppTransportShmemIOError:
762 self.logger.debug("VppTransportShmemIOError: Vpp dead. "
763 "Cannot log show commands.")
766 self.registry.unregister_all(self.logger)
769 """ Clear trace before running each test"""
770 super(VppTestCase, self).setUp()
771 self.reporter.send_keep_alive(self)
774 raise VppDiedError(rv=None, testcase=self.__class__.__name__,
775 method_name=self._testMethodName)
776 self.sleep(.1, "during setUp")
777 self.vpp_stdout_deque.append(
778 "--- test setUp() for %s.%s(%s) starts here ---\n" %
779 (self.__class__.__name__, self._testMethodName,
780 self._testMethodDoc))
781 self.vpp_stderr_deque.append(
782 "--- test setUp() for %s.%s(%s) starts here ---\n" %
783 (self.__class__.__name__, self._testMethodName,
784 self._testMethodDoc))
785 self.vapi.cli("clear trace")
786 # store the test instance inside the test class - so that objects
787 # holding the class can access instance methods (like assertEqual)
788 type(self).test_instance = self
791 def pg_enable_capture(cls, interfaces=None):
793 Enable capture on packet-generator interfaces
795 :param interfaces: iterable interface indexes (if None,
796 use self.pg_interfaces)
799 if interfaces is None:
800 interfaces = cls.pg_interfaces
805 def register_capture(cls, cap_name):
806 """ Register a capture in the testclass """
807 # add to the list of captures with current timestamp
808 cls._captures.append((time.time(), cap_name))
811 def get_vpp_time(cls):
812 # processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT"
813 # returns float("2.190522")
814 timestr = cls.vapi.cli('show clock')
815 head, sep, tail = timestr.partition(',')
816 head, sep, tail = head.partition('Time now')
820 def sleep_on_vpp_time(cls, sec):
821 """ Sleep according to time in VPP world """
822 # On a busy system with many processes
823 # we might end up with VPP time being slower than real world
824 # So take that into account when waiting for VPP to do something
825 start_time = cls.get_vpp_time()
826 while cls.get_vpp_time() - start_time < sec:
830 def pg_start(cls, trace=True):
831 """ Enable the PG, wait till it is done, then clean up """
833 cls.vapi.cli("clear trace")
834 cls.vapi.cli("trace add pg-input 1000")
835 cls.vapi.cli('packet-generator enable')
836 # PG, when starts, runs to completion -
837 # so let's avoid a race condition,
838 # and wait a little till it's done.
839 # Then clean it up - and then be gone.
840 deadline = time.time() + 300
841 while cls.vapi.cli('show packet-generator').find("Yes") != -1:
842 cls.sleep(0.01) # yield
843 if time.time() > deadline:
844 cls.logger.error("Timeout waiting for pg to stop")
846 for stamp, cap_name in cls._captures:
847 cls.vapi.cli('packet-generator delete %s' % cap_name)
851 def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
853 Create packet-generator interfaces.
855 :param interfaces: iterable indexes of the interfaces.
856 :returns: List of created interfaces.
861 intf = VppPGInterface(cls, i, gso, gso_size)
862 setattr(cls, intf.name, intf)
864 cls.pg_interfaces = result
868 def create_loopback_interfaces(cls, count):
870 Create loopback interfaces.
872 :param count: number of interfaces created.
873 :returns: List of created interfaces.
875 result = [VppLoInterface(cls) for i in range(count)]
877 setattr(cls, intf.name, intf)
878 cls.lo_interfaces = result
882 def create_bvi_interfaces(cls, count):
884 Create BVI interfaces.
886 :param count: number of interfaces created.
887 :returns: List of created interfaces.
889 result = [VppBviInterface(cls) for i in range(count)]
891 setattr(cls, intf.name, intf)
892 cls.bvi_interfaces = result
896 def extend_packet(packet, size, padding=' '):
898 Extend packet to given size by padding with spaces or custom padding
899 NOTE: Currently works only when Raw layer is present.
901 :param packet: packet
902 :param size: target size
903 :param padding: padding used to extend the payload
906 packet_len = len(packet) + 4
907 extend = size - packet_len
909 num = (extend // len(padding)) + 1
910 packet[Raw].load += (padding * num)[:extend].encode("ascii")
913 def reset_packet_infos(cls):
914 """ Reset the list of packet info objects and packet counts to zero """
915 cls._packet_infos = {}
916 cls._packet_count_for_dst_if_idx = {}
919 def create_packet_info(cls, src_if, dst_if):
921 Create packet info object containing the source and destination indexes
922 and add it to the testcase's packet info list
924 :param VppInterface src_if: source interface
925 :param VppInterface dst_if: destination interface
927 :returns: _PacketInfo object
931 info.index = len(cls._packet_infos)
932 info.src = src_if.sw_if_index
933 info.dst = dst_if.sw_if_index
934 if isinstance(dst_if, VppSubInterface):
935 dst_idx = dst_if.parent.sw_if_index
937 dst_idx = dst_if.sw_if_index
938 if dst_idx in cls._packet_count_for_dst_if_idx:
939 cls._packet_count_for_dst_if_idx[dst_idx] += 1
941 cls._packet_count_for_dst_if_idx[dst_idx] = 1
942 cls._packet_infos[info.index] = info
946 def info_to_payload(info):
948 Convert _PacketInfo object to packet payload
950 :param info: _PacketInfo object
952 :returns: string containing serialized data from packet info
954 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
958 def payload_to_info(payload, payload_field='load'):
960 Convert packet payload to _PacketInfo object
962 :param payload: packet payload
963 :type payload: <class 'scapy.packet.Raw'>
964 :param payload_field: packet fieldname of payload "load" for
965 <class 'scapy.packet.Raw'>
966 :type payload_field: str
967 :returns: _PacketInfo object containing de-serialized data from payload
970 numbers = getattr(payload, payload_field).split()
972 info.index = int(numbers[0])
973 info.src = int(numbers[1])
974 info.dst = int(numbers[2])
975 info.ip = int(numbers[3])
976 info.proto = int(numbers[4])
979 def get_next_packet_info(self, info):
981 Iterate over the packet info list stored in the testcase
982 Start iteration with first element if info is None
983 Continue based on index in info if info is specified
985 :param info: info or None
986 :returns: next info in list or None if no more infos
991 next_index = info.index + 1
992 if next_index == len(self._packet_infos):
995 return self._packet_infos[next_index]
997 def get_next_packet_info_for_interface(self, src_index, info):
999 Search the packet info list for the next packet info with same source
1002 :param src_index: source interface index to search for
1003 :param info: packet info - where to start the search
1004 :returns: packet info or None
1008 info = self.get_next_packet_info(info)
1011 if info.src == src_index:
1014 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
1016 Search the packet info list for the next packet info with same source
1017 and destination interface indexes
1019 :param src_index: source interface index to search for
1020 :param dst_index: destination interface index to search for
1021 :param info: packet info - where to start the search
1022 :returns: packet info or None
1026 info = self.get_next_packet_info_for_interface(src_index, info)
1029 if info.dst == dst_index:
1032 def assert_equal(self, real_value, expected_value, name_or_class=None):
1033 if name_or_class is None:
1034 self.assertEqual(real_value, expected_value)
1037 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
1038 msg = msg % (getdoc(name_or_class).strip(),
1039 real_value, str(name_or_class(real_value)),
1040 expected_value, str(name_or_class(expected_value)))
1042 msg = "Invalid %s: %s does not match expected value %s" % (
1043 name_or_class, real_value, expected_value)
1045 self.assertEqual(real_value, expected_value, msg)
1047 def assert_in_range(self,
1055 msg = "Invalid %s: %s out of range <%s,%s>" % (
1056 name, real_value, expected_min, expected_max)
1057 self.assertTrue(expected_min <= real_value <= expected_max, msg)
1059 def assert_packet_checksums_valid(self, packet,
1060 ignore_zero_udp_checksums=True):
1061 received = packet.__class__(scapy.compat.raw(packet))
1062 udp_layers = ['UDP', 'UDPerror']
1063 checksum_fields = ['cksum', 'chksum']
1066 temp = received.__class__(scapy.compat.raw(received))
1068 layer = temp.getlayer(counter)
1070 layer = layer.copy()
1071 layer.remove_payload()
1072 for cf in checksum_fields:
1073 if hasattr(layer, cf):
1074 if ignore_zero_udp_checksums and \
1075 0 == getattr(layer, cf) and \
1076 layer.name in udp_layers:
1078 delattr(temp.getlayer(counter), cf)
1079 checksums.append((counter, cf))
1082 counter = counter + 1
1083 if 0 == len(checksums):
1085 temp = temp.__class__(scapy.compat.raw(temp))
1086 for layer, cf in checksums:
1087 calc_sum = getattr(temp[layer], cf)
1089 getattr(received[layer], cf), calc_sum,
1090 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
1092 "Checksum field `%s` on `%s` layer has correct value `%s`" %
1093 (cf, temp[layer].name, calc_sum))
1095 def assert_checksum_valid(self, received_packet, layer,
1096 field_name='chksum',
1097 ignore_zero_checksum=False):
1098 """ Check checksum of received packet on given layer """
1099 received_packet_checksum = getattr(received_packet[layer], field_name)
1100 if ignore_zero_checksum and 0 == received_packet_checksum:
1102 recalculated = received_packet.__class__(
1103 scapy.compat.raw(received_packet))
1104 delattr(recalculated[layer], field_name)
1105 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1106 self.assert_equal(received_packet_checksum,
1107 getattr(recalculated[layer], field_name),
1108 "packet checksum on layer: %s" % layer)
1110 def assert_ip_checksum_valid(self, received_packet,
1111 ignore_zero_checksum=False):
1112 self.assert_checksum_valid(received_packet, 'IP',
1113 ignore_zero_checksum=ignore_zero_checksum)
1115 def assert_tcp_checksum_valid(self, received_packet,
1116 ignore_zero_checksum=False):
1117 self.assert_checksum_valid(received_packet, 'TCP',
1118 ignore_zero_checksum=ignore_zero_checksum)
1120 def assert_udp_checksum_valid(self, received_packet,
1121 ignore_zero_checksum=True):
1122 self.assert_checksum_valid(received_packet, 'UDP',
1123 ignore_zero_checksum=ignore_zero_checksum)
1125 def assert_embedded_icmp_checksum_valid(self, received_packet):
1126 if received_packet.haslayer(IPerror):
1127 self.assert_checksum_valid(received_packet, 'IPerror')
1128 if received_packet.haslayer(TCPerror):
1129 self.assert_checksum_valid(received_packet, 'TCPerror')
1130 if received_packet.haslayer(UDPerror):
1131 self.assert_checksum_valid(received_packet, 'UDPerror',
1132 ignore_zero_checksum=True)
1133 if received_packet.haslayer(ICMPerror):
1134 self.assert_checksum_valid(received_packet, 'ICMPerror')
1136 def assert_icmp_checksum_valid(self, received_packet):
1137 self.assert_checksum_valid(received_packet, 'ICMP')
1138 self.assert_embedded_icmp_checksum_valid(received_packet)
1140 def assert_icmpv6_checksum_valid(self, pkt):
1141 if pkt.haslayer(ICMPv6DestUnreach):
1142 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
1143 self.assert_embedded_icmp_checksum_valid(pkt)
1144 if pkt.haslayer(ICMPv6EchoRequest):
1145 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
1146 if pkt.haslayer(ICMPv6EchoReply):
1147 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
1149 def get_packet_counter(self, counter):
1150 if counter.startswith("/"):
1151 counter_value = self.statistics.get_counter(counter)
1153 counters = self.vapi.cli("sh errors").split('\n')
1155 for i in range(1, len(counters) - 1):
1156 results = counters[i].split()
1157 if results[1] == counter:
1158 counter_value = int(results[0])
1160 return counter_value
1162 def assert_packet_counter_equal(self, counter, expected_value):
1163 counter_value = self.get_packet_counter(counter)
1164 self.assert_equal(counter_value, expected_value,
1165 "packet counter `%s'" % counter)
1167 def assert_error_counter_equal(self, counter, expected_value):
1168 counter_value = self.statistics.get_err_counter(counter)
1169 self.assert_equal(counter_value, expected_value,
1170 "error counter `%s'" % counter)
1173 def sleep(cls, timeout, remark=None):
1175 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1176 # * by Guido, only the main thread can be interrupted.
1178 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1181 if hasattr(os, 'sched_yield'):
1187 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1188 before = time.time()
1191 if after - before > 2 * timeout:
1192 cls.logger.error("unexpected self.sleep() result - "
1193 "slept for %es instead of ~%es!",
1194 after - before, timeout)
1197 "Finished sleep (%s) - slept %es (wanted %es)",
1198 remark, after - before, timeout)
1200 def pg_send(self, intf, pkts, worker=None, trace=True):
1201 intf.add_stream(pkts, worker=worker)
1202 self.pg_enable_capture(self.pg_interfaces)
1203 self.pg_start(trace=trace)
1205 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
1206 self.pg_send(intf, pkts)
1209 for i in self.pg_interfaces:
1210 i.get_capture(0, timeout=timeout)
1211 i.assert_nothing_captured(remark=remark)
1214 def send_and_expect(self, intf, pkts, output, n_rx=None, worker=None,
1218 self.pg_send(intf, pkts, worker=worker, trace=trace)
1219 rx = output.get_capture(n_rx)
1222 def send_and_expect_only(self, intf, pkts, output, timeout=None):
1223 self.pg_send(intf, pkts)
1224 rx = output.get_capture(len(pkts))
1228 for i in self.pg_interfaces:
1229 if i not in outputs:
1230 i.get_capture(0, timeout=timeout)
1231 i.assert_nothing_captured()
1237 def get_testcase_doc_name(test):
1238 return getdoc(test.__class__).splitlines()[0]
1241 def get_test_description(descriptions, test):
1242 short_description = test.shortDescription()
1243 if descriptions and short_description:
1244 return short_description
1249 class TestCaseInfo(object):
1250 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1251 self.logger = logger
1252 self.tempdir = tempdir
1253 self.vpp_pid = vpp_pid
1254 self.vpp_bin_path = vpp_bin_path
1255 self.core_crash_test = None
1258 class VppTestResult(unittest.TestResult):
1260 @property result_string
1261 String variable to store the test case result string.
1263 List variable containing 2-tuples of TestCase instances and strings
1264 holding formatted tracebacks. Each tuple represents a test which
1265 raised an unexpected exception.
1267 List variable containing 2-tuples of TestCase instances and strings
1268 holding formatted tracebacks. Each tuple represents a test where
1269 a failure was explicitly signalled using the TestCase.assert*()
1273 failed_test_cases_info = set()
1274 core_crash_test_cases_info = set()
1275 current_test_case_info = None
1277 def __init__(self, stream=None, descriptions=None, verbosity=None,
1280 :param stream File descriptor to store where to report test results.
1281 Set to the standard error stream by default.
1282 :param descriptions Boolean variable to store information if to use
1283 test case descriptions.
1284 :param verbosity Integer variable to store required verbosity level.
1286 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1287 self.stream = stream
1288 self.descriptions = descriptions
1289 self.verbosity = verbosity
1290 self.result_string = None
1291 self.runner = runner
1293 def addSuccess(self, test):
1295 Record a test succeeded result
1300 if self.current_test_case_info:
1301 self.current_test_case_info.logger.debug(
1302 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1303 test._testMethodName,
1304 test._testMethodDoc))
1305 unittest.TestResult.addSuccess(self, test)
1306 self.result_string = colorize("OK", GREEN)
1308 self.send_result_through_pipe(test, PASS)
1310 def addSkip(self, test, reason):
1312 Record a test skipped.
1318 if self.current_test_case_info:
1319 self.current_test_case_info.logger.debug(
1320 "--- addSkip() %s.%s(%s) called, reason is %s" %
1321 (test.__class__.__name__, test._testMethodName,
1322 test._testMethodDoc, reason))
1323 unittest.TestResult.addSkip(self, test, reason)
1324 self.result_string = colorize("SKIP", YELLOW)
1326 self.send_result_through_pipe(test, SKIP)
1328 def symlink_failed(self):
1329 if self.current_test_case_info:
1331 failed_dir = os.getenv('FAILED_DIR')
1332 link_path = os.path.join(
1335 os.path.basename(self.current_test_case_info.tempdir))
1337 self.current_test_case_info.logger.debug(
1338 "creating a link to the failed test")
1339 self.current_test_case_info.logger.debug(
1340 "os.symlink(%s, %s)" %
1341 (self.current_test_case_info.tempdir, link_path))
1342 if os.path.exists(link_path):
1343 self.current_test_case_info.logger.debug(
1344 'symlink already exists')
1346 os.symlink(self.current_test_case_info.tempdir, link_path)
1348 except Exception as e:
1349 self.current_test_case_info.logger.error(e)
1351 def send_result_through_pipe(self, test, result):
1352 if hasattr(self, 'test_framework_result_pipe'):
1353 pipe = self.test_framework_result_pipe
1355 pipe.send((test.id(), result))
1357 def log_error(self, test, err, fn_name):
1358 if self.current_test_case_info:
1359 if isinstance(test, unittest.suite._ErrorHolder):
1360 test_name = test.description
1362 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1363 test._testMethodName,
1364 test._testMethodDoc)
1365 self.current_test_case_info.logger.debug(
1366 "--- %s() %s called, err is %s" %
1367 (fn_name, test_name, err))
1368 self.current_test_case_info.logger.debug(
1369 "formatted exception is:\n%s" %
1370 "".join(format_exception(*err)))
1372 def add_error(self, test, err, unittest_fn, error_type):
1373 if error_type == FAIL:
1374 self.log_error(test, err, 'addFailure')
1375 error_type_str = colorize("FAIL", RED)
1376 elif error_type == ERROR:
1377 self.log_error(test, err, 'addError')
1378 error_type_str = colorize("ERROR", RED)
1380 raise Exception('Error type %s cannot be used to record an '
1381 'error or a failure' % error_type)
1383 unittest_fn(self, test, err)
1384 if self.current_test_case_info:
1385 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1387 self.current_test_case_info.tempdir)
1388 self.symlink_failed()
1389 self.failed_test_cases_info.add(self.current_test_case_info)
1390 if is_core_present(self.current_test_case_info.tempdir):
1391 if not self.current_test_case_info.core_crash_test:
1392 if isinstance(test, unittest.suite._ErrorHolder):
1393 test_name = str(test)
1395 test_name = "'{!s}' ({!s})".format(
1396 get_testcase_doc_name(test), test.id())
1397 self.current_test_case_info.core_crash_test = test_name
1398 self.core_crash_test_cases_info.add(
1399 self.current_test_case_info)
1401 self.result_string = '%s [no temp dir]' % error_type_str
1403 self.send_result_through_pipe(test, error_type)
1405 def addFailure(self, test, err):
1407 Record a test failed result
1410 :param err: error message
1413 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1415 def addError(self, test, err):
1417 Record a test error result
1420 :param err: error message
1423 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1425 def getDescription(self, test):
1427 Get test description
1430 :returns: test description
1433 return get_test_description(self.descriptions, test)
1435 def startTest(self, test):
1443 def print_header(test):
1444 test_doc = getdoc(test)
1446 raise Exception("No doc string for test '%s'" % test.id())
1447 test_title = test_doc.splitlines()[0]
1448 test_title_colored = colorize(test_title, GREEN)
1449 if test.is_tagged_run_solo():
1450 # long live PEP-8 and 80 char width limitation...
1452 test_title_colored = colorize("SOLO RUN: " + test_title, c)
1454 # This block may overwrite the colorized title above,
1455 # but we want this to stand out and be fixed
1456 if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
1458 w = "FIXME with VPP workers: "
1459 test_title_colored = colorize(w + test_title, c)
1461 if not hasattr(test.__class__, '_header_printed'):
1462 print(double_line_delim)
1463 print(test_title_colored)
1464 print(double_line_delim)
1465 test.__class__._header_printed = True
1468 self.start_test = time.time()
1469 unittest.TestResult.startTest(self, test)
1470 if self.verbosity > 0:
1471 self.stream.writeln(
1472 "Starting " + self.getDescription(test) + " ...")
1473 self.stream.writeln(single_line_delim)
1475 def stopTest(self, test):
1477 Called when the given test has been run
1482 unittest.TestResult.stopTest(self, test)
1484 if self.verbosity > 0:
1485 self.stream.writeln(single_line_delim)
1486 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1487 self.result_string))
1488 self.stream.writeln(single_line_delim)
1490 self.stream.writeln("%-68s %4.2f %s" %
1491 (self.getDescription(test),
1492 time.time() - self.start_test,
1493 self.result_string))
1495 self.send_result_through_pipe(test, TEST_RUN)
1497 def printErrors(self):
1499 Print errors from running the test case
1501 if len(self.errors) > 0 or len(self.failures) > 0:
1502 self.stream.writeln()
1503 self.printErrorList('ERROR', self.errors)
1504 self.printErrorList('FAIL', self.failures)
1506 # ^^ that is the last output from unittest before summary
1507 if not self.runner.print_summary:
1508 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1509 self.stream = devnull
1510 self.runner.stream = devnull
1512 def printErrorList(self, flavour, errors):
1514 Print error list to the output stream together with error type
1515 and test case description.
1517 :param flavour: error type
1518 :param errors: iterable errors
1521 for test, err in errors:
1522 self.stream.writeln(double_line_delim)
1523 self.stream.writeln("%s: %s" %
1524 (flavour, self.getDescription(test)))
1525 self.stream.writeln(single_line_delim)
1526 self.stream.writeln("%s" % err)
1529 class VppTestRunner(unittest.TextTestRunner):
1531 A basic test runner implementation which prints results to standard error.
1535 def resultclass(self):
1536 """Class maintaining the results of the tests"""
1537 return VppTestResult
1539 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1540 result_pipe=None, failfast=False, buffer=False,
1541 resultclass=None, print_summary=True, **kwargs):
1542 # ignore stream setting here, use hard-coded stdout to be in sync
1543 # with prints from VppTestCase methods ...
1544 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1545 verbosity, failfast, buffer,
1546 resultclass, **kwargs)
1547 KeepAliveReporter.pipe = keep_alive_pipe
1549 self.orig_stream = self.stream
1550 self.resultclass.test_framework_result_pipe = result_pipe
1552 self.print_summary = print_summary
1554 def _makeResult(self):
1555 return self.resultclass(self.stream,
1560 def run(self, test):
1567 faulthandler.enable() # emit stack trace to stderr if killed by signal
1569 result = super(VppTestRunner, self).run(test)
1570 if not self.print_summary:
1571 self.stream = self.orig_stream
1572 result.stream = self.orig_stream
1576 class Worker(Thread):
1577 def __init__(self, executable_args, logger, env=None, *args, **kwargs):
1578 super(Worker, self).__init__(*args, **kwargs)
1579 self.logger = logger
1580 self.args = executable_args
1581 if hasattr(self, 'testcase') and self.testcase.debug_all:
1582 if self.testcase.debug_gdbserver:
1583 self.args = ['/usr/bin/gdbserver', 'localhost:{port}'
1584 .format(port=self.testcase.gdbserver_port)] + args
1585 elif self.testcase.debug_gdb and hasattr(self, 'wait_for_gdb'):
1586 self.args.append(self.wait_for_gdb)
1587 self.app_bin = executable_args[0]
1588 self.app_name = os.path.basename(self.app_bin)
1589 if hasattr(self, 'role'):
1590 self.app_name += ' {role}'.format(role=self.role)
1593 env = {} if env is None else env
1594 self.env = copy.deepcopy(env)
1596 def wait_for_enter(self):
1597 if not hasattr(self, 'testcase'):
1599 if self.testcase.debug_all and self.testcase.debug_gdbserver:
1601 print(double_line_delim)
1602 print("Spawned GDB Server for '{app}' with PID: {pid}"
1603 .format(app=self.app_name, pid=self.process.pid))
1604 elif self.testcase.debug_all and self.testcase.debug_gdb:
1606 print(double_line_delim)
1607 print("Spawned '{app}' with PID: {pid}"
1608 .format(app=self.app_name, pid=self.process.pid))
1611 print(single_line_delim)
1612 print("You can debug '{app}' using:".format(app=self.app_name))
1613 if self.testcase.debug_gdbserver:
1614 print("sudo gdb " + self.app_bin +
1615 " -ex 'target remote localhost:{port}'"
1616 .format(port=self.testcase.gdbserver_port))
1617 print("Now is the time to attach gdb by running the above "
1618 "command, set up breakpoints etc., then resume from "
1619 "within gdb by issuing the 'continue' command")
1620 self.testcase.gdbserver_port += 1
1621 elif self.testcase.debug_gdb:
1622 print("sudo gdb " + self.app_bin +
1623 " -ex 'attach {pid}'".format(pid=self.process.pid))
1624 print("Now is the time to attach gdb by running the above "
1625 "command and set up breakpoints etc., then resume from"
1626 " within gdb by issuing the 'continue' command")
1627 print(single_line_delim)
1628 input("Press ENTER to continue running the testcase...")
1631 executable = self.args[0]
1632 if not os.path.exists(executable) or not os.access(
1633 executable, os.F_OK | os.X_OK):
1634 # Exit code that means some system file did not exist,
1635 # could not be opened, or had some other kind of error.
1636 self.result = os.EX_OSFILE
1637 raise EnvironmentError(
1638 "executable '%s' is not found or executable." % executable)
1639 self.logger.debug("Running executable: '{app}'"
1640 .format(app=' '.join(self.args)))
1641 env = os.environ.copy()
1642 env.update(self.env)
1643 env["CK_LOG_FILE_NAME"] = "-"
1644 self.process = subprocess.Popen(
1645 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1646 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1647 self.wait_for_enter()
1648 out, err = self.process.communicate()
1649 self.logger.debug("Finished running `{app}'".format(app=self.app_name))
1650 self.logger.info("Return code is `%s'" % self.process.returncode)
1651 self.logger.info(single_line_delim)
1652 self.logger.info("Executable `{app}' wrote to stdout:"
1653 .format(app=self.app_name))
1654 self.logger.info(single_line_delim)
1655 self.logger.info(out.decode('utf-8'))
1656 self.logger.info(single_line_delim)
1657 self.logger.info("Executable `{app}' wrote to stderr:"
1658 .format(app=self.app_name))
1659 self.logger.info(single_line_delim)
1660 self.logger.info(err.decode('utf-8'))
1661 self.logger.info(single_line_delim)
1662 self.result = self.process.returncode
1665 if __name__ == '__main__':