3 from __future__ import print_function
19 from collections import deque
20 from threading import Thread, Event
21 from inspect import getdoc, isclass
22 from traceback import format_exception
23 from logging import FileHandler, DEBUG, Formatter
27 from scapy.packet import Raw
28 import hook as hookmodule
29 from vpp_pg_interface import VppPGInterface
30 from vpp_sub_interface import VppSubInterface
31 from vpp_lo_interface import VppLoInterface
32 from vpp_bvi_interface import VppBviInterface
33 from vpp_papi_provider import VppPapiProvider
35 from vpp_papi.vpp_stats import VPPStats
36 from vpp_papi.vpp_transport_shmem import VppTransportShmemIOError
37 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
39 from vpp_object import VppObjectRegistry
40 from util import ppp, is_core_present
41 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
42 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
43 from scapy.layers.inet6 import ICMPv6EchoReply
45 logger = logging.getLogger(__name__)
47 # Set up an empty logger for the testcase that can be overridden as necessary
48 null_logger = logging.getLogger('VppTestCase')
49 null_logger.addHandler(logging.NullHandler())
58 class BoolEnvironmentVariable(object):
60 def __init__(self, env_var_name, default='n', true_values=None):
61 self.name = env_var_name
62 self.default = default
63 self.true_values = true_values if true_values is not None else \
67 return os.getenv(self.name, self.default).lower() in self.true_values
69 if sys.version_info[0] == 2:
70 __nonzero__ = __bool__
73 return 'BoolEnvironmentVariable(%r, default=%r, true_values=%r)' % \
74 (self.name, self.default, self.true_values)
77 debug_framework = BoolEnvironmentVariable('TEST_DEBUG')
82 Test framework module.
84 The module provides a set of tools for constructing and running tests and
85 representing the results.
89 class VppDiedError(Exception):
90 """ exception for reporting that the subprocess has died."""
92 signals_by_value = {v: k for k, v in signal.__dict__.items() if
93 k.startswith('SIG') and not k.startswith('SIG_')}
95 def __init__(self, rv=None, testcase=None, method_name=None):
97 self.signal_name = None
98 self.testcase = testcase
99 self.method_name = method_name
102 self.signal_name = VppDiedError.signals_by_value[-rv]
103 except (KeyError, TypeError):
106 if testcase is None and method_name is None:
109 in_msg = 'running %s.%s ' % (testcase, method_name)
111 msg = "VPP subprocess died %sunexpectedly with return code: %d%s." % (
114 ' [%s]' % (self.signal_name if
115 self.signal_name is not None else ''))
116 super(VppDiedError, self).__init__(msg)
119 class _PacketInfo(object):
120 """Private class to create packet info object.
122 Help process information about the next packet.
123 Set variables to default values.
125 #: Store the index of the packet.
127 #: Store the index of the source packet generator interface of the packet.
129 #: Store the index of the destination packet generator interface
132 #: Store expected ip version
134 #: Store expected upper protocol
136 #: Store the copy of the former packet.
139 def __eq__(self, other):
140 index = self.index == other.index
141 src = self.src == other.src
142 dst = self.dst == other.dst
143 data = self.data == other.data
144 return index and src and dst and data
147 def pump_output(testclass):
148 """ pump output from vpp stdout/stderr to proper queues """
151 while not testclass.pump_thread_stop_flag.is_set():
152 readable = select.select([testclass.vpp.stdout.fileno(),
153 testclass.vpp.stderr.fileno(),
154 testclass.pump_thread_wakeup_pipe[0]],
156 if testclass.vpp.stdout.fileno() in readable:
157 read = os.read(testclass.vpp.stdout.fileno(), 102400)
159 split = read.decode('ascii',
160 errors='backslashreplace').splitlines(True)
161 if len(stdout_fragment) > 0:
162 split[0] = "%s%s" % (stdout_fragment, split[0])
163 if len(split) > 0 and split[-1].endswith("\n"):
167 stdout_fragment = split[-1]
168 testclass.vpp_stdout_deque.extend(split[:limit])
169 if not testclass.cache_vpp_output:
170 for line in split[:limit]:
171 testclass.logger.info(
172 "VPP STDOUT: %s" % line.rstrip("\n"))
173 if testclass.vpp.stderr.fileno() in readable:
174 read = os.read(testclass.vpp.stderr.fileno(), 102400)
176 split = read.decode('ascii',
177 errors='backslashreplace').splitlines(True)
178 if len(stderr_fragment) > 0:
179 split[0] = "%s%s" % (stderr_fragment, split[0])
180 if len(split) > 0 and split[-1].endswith("\n"):
184 stderr_fragment = split[-1]
186 testclass.vpp_stderr_deque.extend(split[:limit])
187 if not testclass.cache_vpp_output:
188 for line in split[:limit]:
189 testclass.logger.error(
190 "VPP STDERR: %s" % line.rstrip("\n"))
191 # ignoring the dummy pipe here intentionally - the
192 # flag will take care of properly terminating the loop
195 def _is_skip_aarch64_set():
196 return BoolEnvironmentVariable('SKIP_AARCH64')
199 is_skip_aarch64_set = _is_skip_aarch64_set()
202 def _is_platform_aarch64():
203 return platform.machine() == 'aarch64'
206 is_platform_aarch64 = _is_platform_aarch64()
209 def _running_extended_tests():
210 return BoolEnvironmentVariable("EXTENDED_TESTS")
213 running_extended_tests = _running_extended_tests()
216 def _running_gcov_tests():
217 return BoolEnvironmentVariable("GCOV_TESTS")
220 running_gcov_tests = _running_gcov_tests()
223 class KeepAliveReporter(object):
225 Singleton object which reports test start to parent process
230 self.__dict__ = self._shared_state
238 def pipe(self, pipe):
239 if self._pipe is not None:
240 raise Exception("Internal error - pipe should only be set once.")
243 def send_keep_alive(self, test, desc=None):
245 Write current test tmpdir & desc to keep-alive pipe to signal liveness
247 if self.pipe is None:
248 # if not running forked..
252 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
256 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
259 class TestCaseTag(Enum):
260 # marks the suites that must run at the end
261 # using only a single test runner
263 # marks the suites broken on VPP multi-worker
264 FIXME_VPP_WORKERS = 2
267 def create_tag_decorator(e):
270 cls.test_tags.append(e)
271 except AttributeError:
277 tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
278 tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
281 class VppTestCase(unittest.TestCase):
282 """This subclass is a base class for VPP test cases that are implemented as
283 classes. It provides methods to create and run test case.
286 extra_vpp_punt_config = []
287 extra_vpp_plugin_config = []
289 vapi_response_timeout = 5
292 def packet_infos(self):
293 """List of packet infos"""
294 return self._packet_infos
297 def get_packet_count_for_if_idx(cls, dst_if_index):
298 """Get the number of packet info for specified destination if index"""
299 if dst_if_index in cls._packet_count_for_dst_if_idx:
300 return cls._packet_count_for_dst_if_idx[dst_if_index]
305 def has_tag(cls, tag):
306 """ if the test case has a given tag - return true """
308 return tag in cls.test_tags
309 except AttributeError:
314 def is_tagged_run_solo(cls):
315 """ if the test case class is timing-sensitive - return true """
316 return cls.has_tag(TestCaseTag.RUN_SOLO)
320 """Return the instance of this testcase"""
321 return cls.test_instance
324 def set_debug_flags(cls, d):
325 cls.gdbserver_port = 7777
326 cls.debug_core = False
327 cls.debug_gdb = False
328 cls.debug_gdbserver = False
329 cls.debug_all = False
334 cls.debug_core = True
335 elif dl == "gdb" or dl == "gdb-all":
337 elif dl == "gdbserver" or dl == "gdbserver-all":
338 cls.debug_gdbserver = True
340 raise Exception("Unrecognized DEBUG option: '%s'" % d)
341 if dl == "gdb-all" or dl == "gdbserver-all":
345 def get_least_used_cpu():
346 cpu_usage_list = [set(range(psutil.cpu_count()))]
347 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
348 if 'vpp_main' == p.info['name']]
349 for vpp_process in vpp_processes:
350 for cpu_usage_set in cpu_usage_list:
352 cpu_num = vpp_process.cpu_num()
353 if cpu_num in cpu_usage_set:
354 cpu_usage_set_index = cpu_usage_list.index(
356 if cpu_usage_set_index == len(cpu_usage_list) - 1:
357 cpu_usage_list.append({cpu_num})
359 cpu_usage_list[cpu_usage_set_index + 1].add(
361 cpu_usage_set.remove(cpu_num)
363 except psutil.NoSuchProcess:
366 for cpu_usage_set in cpu_usage_list:
367 if len(cpu_usage_set) > 0:
368 min_usage_set = cpu_usage_set
371 return random.choice(tuple(min_usage_set))
374 def setUpConstants(cls):
375 """ Set-up the test case class based on environment variables """
376 cls.step = BoolEnvironmentVariable('STEP')
377 d = os.getenv("DEBUG", None)
378 # inverted case to handle '' == True
379 c = os.getenv("CACHE_OUTPUT", "1")
380 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
381 cls.set_debug_flags(d)
382 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
383 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
384 cls.test_plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
385 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
387 if cls.plugin_path is not None:
388 if cls.extern_plugin_path is not None:
389 plugin_path = "%s:%s" % (
390 cls.plugin_path, cls.extern_plugin_path)
392 plugin_path = cls.plugin_path
393 elif cls.extern_plugin_path is not None:
394 plugin_path = cls.extern_plugin_path
396 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
397 debug_cli = "cli-listen localhost:5002"
399 size = os.getenv("COREDUMP_SIZE")
401 coredump_size = "coredump-size %s" % size
402 if coredump_size is None:
403 coredump_size = "coredump-size unlimited"
405 cpu_core_number = cls.get_least_used_cpu()
406 if not hasattr(cls, "worker_config"):
407 cls.worker_config = os.getenv("VPP_WORKER_CONFIG", "")
408 if cls.worker_config != "":
409 if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
410 cls.worker_config = ""
412 default_variant = os.getenv("VARIANT")
413 if default_variant is not None:
414 default_variant = "defaults { %s 100 }" % default_variant
418 api_fuzzing = os.getenv("API_FUZZ")
419 if api_fuzzing is None:
422 cls.vpp_cmdline = [cls.vpp_bin, "unix",
423 "{", "nodaemon", debug_cli, "full-coredump",
424 coredump_size, "runtime-dir", cls.tempdir, "}",
425 "api-trace", "{", "on", "}", "api-segment", "{",
426 "prefix", cls.shm_prefix, "}", "cpu", "{",
427 "main-core", str(cpu_core_number),
428 cls.worker_config, "}",
429 "physmem", "{", "max-size", "32m", "}",
430 "statseg", "{", "socket-name", cls.stats_sock, "}",
431 "socksvr", "{", "socket-name", cls.api_sock, "}",
432 "node { ", default_variant, "}",
433 "api-fuzz {", api_fuzzing, "}",
435 "{", "plugin", "dpdk_plugin.so", "{", "disable",
436 "}", "plugin", "rdma_plugin.so", "{", "disable",
437 "}", "plugin", "lisp_unittest_plugin.so", "{",
439 "}", "plugin", "unittest_plugin.so", "{", "enable",
440 "}"] + cls.extra_vpp_plugin_config + ["}", ]
442 if cls.extra_vpp_punt_config is not None:
443 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
444 if plugin_path is not None:
445 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
446 if cls.test_plugin_path is not None:
447 cls.vpp_cmdline.extend(["test_plugin_path", cls.test_plugin_path])
449 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
450 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
453 def wait_for_enter(cls):
454 if cls.debug_gdbserver:
455 print(double_line_delim)
456 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
458 print(double_line_delim)
459 print("Spawned VPP with PID: %d" % cls.vpp.pid)
461 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
463 print(single_line_delim)
464 print("You can debug VPP using:")
465 if cls.debug_gdbserver:
466 print("sudo gdb " + cls.vpp_bin +
467 " -ex 'target remote localhost:{port}'"
468 .format(port=cls.gdbserver_port))
469 print("Now is the time to attach gdb by running the above "
470 "command, set up breakpoints etc., then resume VPP from "
471 "within gdb by issuing the 'continue' command")
472 cls.gdbserver_port += 1
474 print("sudo gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
475 print("Now is the time to attach gdb by running the above "
476 "command and set up breakpoints etc., then resume VPP from"
477 " within gdb by issuing the 'continue' command")
478 print(single_line_delim)
479 input("Press ENTER to continue running the testcase...")
483 cmdline = cls.vpp_cmdline
485 if cls.debug_gdbserver:
486 gdbserver = '/usr/bin/gdbserver'
487 if not os.path.isfile(gdbserver) or \
488 not os.access(gdbserver, os.X_OK):
489 raise Exception("gdbserver binary '%s' does not exist or is "
490 "not executable" % gdbserver)
492 cmdline = [gdbserver, 'localhost:{port}'
493 .format(port=cls.gdbserver_port)] + cls.vpp_cmdline
494 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
497 cls.vpp = subprocess.Popen(cmdline,
498 stdout=subprocess.PIPE,
499 stderr=subprocess.PIPE)
500 except subprocess.CalledProcessError as e:
501 cls.logger.critical("Subprocess returned with non-0 return code: ("
505 cls.logger.critical("Subprocess returned with OS error: "
506 "(%s) %s", e.errno, e.strerror)
508 except Exception as e:
509 cls.logger.exception("Subprocess returned unexpected from "
516 def wait_for_coredump(cls):
517 corefile = cls.tempdir + "/core"
518 if os.path.isfile(corefile):
519 cls.logger.error("Waiting for coredump to complete: %s", corefile)
520 curr_size = os.path.getsize(corefile)
521 deadline = time.time() + 60
523 while time.time() < deadline:
526 curr_size = os.path.getsize(corefile)
527 if size == curr_size:
531 cls.logger.error("Timed out waiting for coredump to complete:"
534 cls.logger.error("Coredump complete: %s, size %d",
540 Perform class setup before running the testcase
541 Remove shared memory files, start vpp and connect the vpp-api
543 super(VppTestCase, cls).setUpClass()
544 gc.collect() # run garbage collection first
545 cls.logger = get_logger(cls.__name__)
546 seed = os.environ["RND_SEED"]
548 if hasattr(cls, 'parallel_handler'):
549 cls.logger.addHandler(cls.parallel_handler)
550 cls.logger.propagate = False
552 cls.tempdir = tempfile.mkdtemp(
553 prefix='vpp-unittest-%s-' % cls.__name__)
554 cls.stats_sock = "%s/stats.sock" % cls.tempdir
555 cls.api_sock = "%s/api.sock" % cls.tempdir
556 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
557 cls.file_handler.setFormatter(
558 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
560 cls.file_handler.setLevel(DEBUG)
561 cls.logger.addHandler(cls.file_handler)
562 cls.logger.debug("--- setUpClass() for %s called ---" %
564 cls.shm_prefix = os.path.basename(cls.tempdir)
565 os.chdir(cls.tempdir)
566 cls.logger.info("Temporary dir is %s, shm prefix is %s",
567 cls.tempdir, cls.shm_prefix)
568 cls.logger.debug("Random seed is %s" % seed)
570 cls.reset_packet_infos()
574 cls.registry = VppObjectRegistry()
575 cls.vpp_startup_failed = False
576 cls.reporter = KeepAliveReporter()
577 # need to catch exceptions here because if we raise, then the cleanup
578 # doesn't get called and we might end with a zombie vpp
581 cls.reporter.send_keep_alive(cls, 'setUpClass')
582 VppTestResult.current_test_case_info = TestCaseInfo(
583 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
584 cls.vpp_stdout_deque = deque()
585 cls.vpp_stderr_deque = deque()
586 cls.pump_thread_stop_flag = Event()
587 cls.pump_thread_wakeup_pipe = os.pipe()
588 cls.pump_thread = Thread(target=pump_output, args=(cls,))
589 cls.pump_thread.daemon = True
590 cls.pump_thread.start()
591 if cls.debug_gdb or cls.debug_gdbserver:
592 cls.vapi_response_timeout = 0
593 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
594 cls.vapi_response_timeout)
596 hook = hookmodule.StepHook(cls)
598 hook = hookmodule.PollHook(cls)
599 cls.vapi.register_hook(hook)
600 cls.statistics = VPPStats(socketname=cls.stats_sock)
604 cls.vpp_startup_failed = True
606 "VPP died shortly after startup, check the"
607 " output to standard error for possible cause")
611 except vpp_papi.VPPIOError as e:
612 cls.logger.debug("Exception connecting to vapi: %s" % e)
613 cls.vapi.disconnect()
615 if cls.debug_gdbserver:
616 print(colorize("You're running VPP inside gdbserver but "
617 "VPP-API connection failed, did you forget "
618 "to 'continue' VPP from within gdb?", RED))
620 except vpp_papi.VPPRuntimeError as e:
621 cls.logger.debug("%s" % e)
624 except Exception as e:
625 cls.logger.debug("Exception connecting to VPP: %s" % e)
630 def _debug_quit(cls):
631 if (cls.debug_gdbserver or cls.debug_gdb):
635 if cls.vpp.returncode is None:
637 print(double_line_delim)
638 print("VPP or GDB server is still running")
639 print(single_line_delim)
640 input("When done debugging, press ENTER to kill the "
641 "process and finish running the testcase...")
642 except AttributeError:
648 Disconnect vpp-api, kill vpp and cleanup shared memory files
652 # first signal that we want to stop the pump thread, then wake it up
653 if hasattr(cls, 'pump_thread_stop_flag'):
654 cls.pump_thread_stop_flag.set()
655 if hasattr(cls, 'pump_thread_wakeup_pipe'):
656 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
657 if hasattr(cls, 'pump_thread'):
658 cls.logger.debug("Waiting for pump thread to stop")
659 cls.pump_thread.join()
660 if hasattr(cls, 'vpp_stderr_reader_thread'):
661 cls.logger.debug("Waiting for stderr pump to stop")
662 cls.vpp_stderr_reader_thread.join()
664 if hasattr(cls, 'vpp'):
665 if hasattr(cls, 'vapi'):
666 cls.logger.debug(cls.vapi.vpp.get_stats())
667 cls.logger.debug("Disconnecting class vapi client on %s",
669 cls.vapi.disconnect()
670 cls.logger.debug("Deleting class vapi attribute on %s",
674 if cls.vpp.returncode is None:
675 cls.wait_for_coredump()
676 cls.logger.debug("Sending TERM to vpp")
678 cls.logger.debug("Waiting for vpp to die")
679 cls.vpp.communicate()
680 cls.logger.debug("Deleting class vpp attribute on %s",
682 cls.vpp.stdout.close()
683 cls.vpp.stderr.close()
686 if cls.vpp_startup_failed:
687 stdout_log = cls.logger.info
688 stderr_log = cls.logger.critical
690 stdout_log = cls.logger.info
691 stderr_log = cls.logger.info
693 if hasattr(cls, 'vpp_stdout_deque'):
694 stdout_log(single_line_delim)
695 stdout_log('VPP output to stdout while running %s:', cls.__name__)
696 stdout_log(single_line_delim)
697 vpp_output = "".join(cls.vpp_stdout_deque)
698 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
700 stdout_log('\n%s', vpp_output)
701 stdout_log(single_line_delim)
703 if hasattr(cls, 'vpp_stderr_deque'):
704 stderr_log(single_line_delim)
705 stderr_log('VPP output to stderr while running %s:', cls.__name__)
706 stderr_log(single_line_delim)
707 vpp_output = "".join(cls.vpp_stderr_deque)
708 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
710 stderr_log('\n%s', vpp_output)
711 stderr_log(single_line_delim)
714 def tearDownClass(cls):
715 """ Perform final cleanup after running all tests in this test-case """
716 cls.logger.debug("--- tearDownClass() for %s called ---" %
718 cls.reporter.send_keep_alive(cls, 'tearDownClass')
720 cls.file_handler.close()
721 cls.reset_packet_infos()
723 debug_internal.on_tear_down_class(cls)
725 def show_commands_at_teardown(self):
726 """ Allow subclass specific teardown logging additions."""
727 self.logger.info("--- No test specific show commands provided. ---")
730 """ Show various debug prints after each test """
731 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
732 (self.__class__.__name__, self._testMethodName,
733 self._testMethodDoc))
736 if not self.vpp_dead:
737 self.logger.debug(self.vapi.cli("show trace max 1000"))
738 self.logger.info(self.vapi.ppcli("show interface"))
739 self.logger.info(self.vapi.ppcli("show hardware"))
740 self.logger.info(self.statistics.set_errors_str())
741 self.logger.info(self.vapi.ppcli("show run"))
742 self.logger.info(self.vapi.ppcli("show log"))
743 self.logger.info(self.vapi.ppcli("show bihash"))
744 self.logger.info("Logging testcase specific show commands.")
745 self.show_commands_at_teardown()
746 self.registry.remove_vpp_config(self.logger)
747 # Save/Dump VPP api trace log
748 m = self._testMethodName
749 api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
750 tmp_api_trace = "/tmp/%s" % api_trace
751 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
752 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
753 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
755 os.rename(tmp_api_trace, vpp_api_trace_log)
756 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
758 except VppTransportShmemIOError:
759 self.logger.debug("VppTransportShmemIOError: Vpp dead. "
760 "Cannot log show commands.")
763 self.registry.unregister_all(self.logger)
766 """ Clear trace before running each test"""
767 super(VppTestCase, self).setUp()
768 self.reporter.send_keep_alive(self)
771 raise VppDiedError(rv=None, testcase=self.__class__.__name__,
772 method_name=self._testMethodName)
773 self.sleep(.1, "during setUp")
774 self.vpp_stdout_deque.append(
775 "--- test setUp() for %s.%s(%s) starts here ---\n" %
776 (self.__class__.__name__, self._testMethodName,
777 self._testMethodDoc))
778 self.vpp_stderr_deque.append(
779 "--- test setUp() for %s.%s(%s) starts here ---\n" %
780 (self.__class__.__name__, self._testMethodName,
781 self._testMethodDoc))
782 self.vapi.cli("clear trace")
783 # store the test instance inside the test class - so that objects
784 # holding the class can access instance methods (like assertEqual)
785 type(self).test_instance = self
788 def pg_enable_capture(cls, interfaces=None):
790 Enable capture on packet-generator interfaces
792 :param interfaces: iterable interface indexes (if None,
793 use self.pg_interfaces)
796 if interfaces is None:
797 interfaces = cls.pg_interfaces
802 def register_capture(cls, cap_name):
803 """ Register a capture in the testclass """
804 # add to the list of captures with current timestamp
805 cls._captures.append((time.time(), cap_name))
808 def get_vpp_time(cls):
809 # processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT"
810 # returns float("2.190522")
811 timestr = cls.vapi.cli('show clock')
812 head, sep, tail = timestr.partition(',')
813 head, sep, tail = head.partition('Time now')
817 def sleep_on_vpp_time(cls, sec):
818 """ Sleep according to time in VPP world """
819 # On a busy system with many processes
820 # we might end up with VPP time being slower than real world
821 # So take that into account when waiting for VPP to do something
822 start_time = cls.get_vpp_time()
823 while cls.get_vpp_time() - start_time < sec:
827 def pg_start(cls, trace=True):
828 """ Enable the PG, wait till it is done, then clean up """
830 cls.vapi.cli("clear trace")
831 cls.vapi.cli("trace add pg-input 1000")
832 cls.vapi.cli('packet-generator enable')
833 # PG, when starts, runs to completion -
834 # so let's avoid a race condition,
835 # and wait a little till it's done.
836 # Then clean it up - and then be gone.
837 deadline = time.time() + 300
838 while cls.vapi.cli('show packet-generator').find("Yes") != -1:
839 cls.sleep(0.01) # yield
840 if time.time() > deadline:
841 cls.logger.error("Timeout waiting for pg to stop")
843 for stamp, cap_name in cls._captures:
844 cls.vapi.cli('packet-generator delete %s' % cap_name)
848 def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
850 Create packet-generator interfaces.
852 :param interfaces: iterable indexes of the interfaces.
853 :returns: List of created interfaces.
858 intf = VppPGInterface(cls, i, gso, gso_size)
859 setattr(cls, intf.name, intf)
861 cls.pg_interfaces = result
865 def create_loopback_interfaces(cls, count):
867 Create loopback interfaces.
869 :param count: number of interfaces created.
870 :returns: List of created interfaces.
872 result = [VppLoInterface(cls) for i in range(count)]
874 setattr(cls, intf.name, intf)
875 cls.lo_interfaces = result
879 def create_bvi_interfaces(cls, count):
881 Create BVI interfaces.
883 :param count: number of interfaces created.
884 :returns: List of created interfaces.
886 result = [VppBviInterface(cls) for i in range(count)]
888 setattr(cls, intf.name, intf)
889 cls.bvi_interfaces = result
893 def extend_packet(packet, size, padding=' '):
895 Extend packet to given size by padding with spaces or custom padding
896 NOTE: Currently works only when Raw layer is present.
898 :param packet: packet
899 :param size: target size
900 :param padding: padding used to extend the payload
903 packet_len = len(packet) + 4
904 extend = size - packet_len
906 num = (extend // len(padding)) + 1
907 packet[Raw].load += (padding * num)[:extend].encode("ascii")
910 def reset_packet_infos(cls):
911 """ Reset the list of packet info objects and packet counts to zero """
912 cls._packet_infos = {}
913 cls._packet_count_for_dst_if_idx = {}
916 def create_packet_info(cls, src_if, dst_if):
918 Create packet info object containing the source and destination indexes
919 and add it to the testcase's packet info list
921 :param VppInterface src_if: source interface
922 :param VppInterface dst_if: destination interface
924 :returns: _PacketInfo object
928 info.index = len(cls._packet_infos)
929 info.src = src_if.sw_if_index
930 info.dst = dst_if.sw_if_index
931 if isinstance(dst_if, VppSubInterface):
932 dst_idx = dst_if.parent.sw_if_index
934 dst_idx = dst_if.sw_if_index
935 if dst_idx in cls._packet_count_for_dst_if_idx:
936 cls._packet_count_for_dst_if_idx[dst_idx] += 1
938 cls._packet_count_for_dst_if_idx[dst_idx] = 1
939 cls._packet_infos[info.index] = info
943 def info_to_payload(info):
945 Convert _PacketInfo object to packet payload
947 :param info: _PacketInfo object
949 :returns: string containing serialized data from packet info
951 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
955 def payload_to_info(payload, payload_field='load'):
957 Convert packet payload to _PacketInfo object
959 :param payload: packet payload
960 :type payload: <class 'scapy.packet.Raw'>
961 :param payload_field: packet fieldname of payload "load" for
962 <class 'scapy.packet.Raw'>
963 :type payload_field: str
964 :returns: _PacketInfo object containing de-serialized data from payload
967 numbers = getattr(payload, payload_field).split()
969 info.index = int(numbers[0])
970 info.src = int(numbers[1])
971 info.dst = int(numbers[2])
972 info.ip = int(numbers[3])
973 info.proto = int(numbers[4])
976 def get_next_packet_info(self, info):
978 Iterate over the packet info list stored in the testcase
979 Start iteration with first element if info is None
980 Continue based on index in info if info is specified
982 :param info: info or None
983 :returns: next info in list or None if no more infos
988 next_index = info.index + 1
989 if next_index == len(self._packet_infos):
992 return self._packet_infos[next_index]
994 def get_next_packet_info_for_interface(self, src_index, info):
996 Search the packet info list for the next packet info with same source
999 :param src_index: source interface index to search for
1000 :param info: packet info - where to start the search
1001 :returns: packet info or None
1005 info = self.get_next_packet_info(info)
1008 if info.src == src_index:
1011 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
1013 Search the packet info list for the next packet info with same source
1014 and destination interface indexes
1016 :param src_index: source interface index to search for
1017 :param dst_index: destination interface index to search for
1018 :param info: packet info - where to start the search
1019 :returns: packet info or None
1023 info = self.get_next_packet_info_for_interface(src_index, info)
1026 if info.dst == dst_index:
1029 def assert_equal(self, real_value, expected_value, name_or_class=None):
1030 if name_or_class is None:
1031 self.assertEqual(real_value, expected_value)
1034 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
1035 msg = msg % (getdoc(name_or_class).strip(),
1036 real_value, str(name_or_class(real_value)),
1037 expected_value, str(name_or_class(expected_value)))
1039 msg = "Invalid %s: %s does not match expected value %s" % (
1040 name_or_class, real_value, expected_value)
1042 self.assertEqual(real_value, expected_value, msg)
1044 def assert_in_range(self,
1052 msg = "Invalid %s: %s out of range <%s,%s>" % (
1053 name, real_value, expected_min, expected_max)
1054 self.assertTrue(expected_min <= real_value <= expected_max, msg)
1056 def assert_packet_checksums_valid(self, packet,
1057 ignore_zero_udp_checksums=True):
1058 received = packet.__class__(scapy.compat.raw(packet))
1059 udp_layers = ['UDP', 'UDPerror']
1060 checksum_fields = ['cksum', 'chksum']
1063 temp = received.__class__(scapy.compat.raw(received))
1065 layer = temp.getlayer(counter)
1067 layer = layer.copy()
1068 layer.remove_payload()
1069 for cf in checksum_fields:
1070 if hasattr(layer, cf):
1071 if ignore_zero_udp_checksums and \
1072 0 == getattr(layer, cf) and \
1073 layer.name in udp_layers:
1075 delattr(temp.getlayer(counter), cf)
1076 checksums.append((counter, cf))
1079 counter = counter + 1
1080 if 0 == len(checksums):
1082 temp = temp.__class__(scapy.compat.raw(temp))
1083 for layer, cf in checksums:
1084 calc_sum = getattr(temp[layer], cf)
1086 getattr(received[layer], cf), calc_sum,
1087 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
1089 "Checksum field `%s` on `%s` layer has correct value `%s`" %
1090 (cf, temp[layer].name, calc_sum))
1092 def assert_checksum_valid(self, received_packet, layer,
1093 field_name='chksum',
1094 ignore_zero_checksum=False):
1095 """ Check checksum of received packet on given layer """
1096 received_packet_checksum = getattr(received_packet[layer], field_name)
1097 if ignore_zero_checksum and 0 == received_packet_checksum:
1099 recalculated = received_packet.__class__(
1100 scapy.compat.raw(received_packet))
1101 delattr(recalculated[layer], field_name)
1102 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1103 self.assert_equal(received_packet_checksum,
1104 getattr(recalculated[layer], field_name),
1105 "packet checksum on layer: %s" % layer)
1107 def assert_ip_checksum_valid(self, received_packet,
1108 ignore_zero_checksum=False):
1109 self.assert_checksum_valid(received_packet, 'IP',
1110 ignore_zero_checksum=ignore_zero_checksum)
1112 def assert_tcp_checksum_valid(self, received_packet,
1113 ignore_zero_checksum=False):
1114 self.assert_checksum_valid(received_packet, 'TCP',
1115 ignore_zero_checksum=ignore_zero_checksum)
1117 def assert_udp_checksum_valid(self, received_packet,
1118 ignore_zero_checksum=True):
1119 self.assert_checksum_valid(received_packet, 'UDP',
1120 ignore_zero_checksum=ignore_zero_checksum)
1122 def assert_embedded_icmp_checksum_valid(self, received_packet):
1123 if received_packet.haslayer(IPerror):
1124 self.assert_checksum_valid(received_packet, 'IPerror')
1125 if received_packet.haslayer(TCPerror):
1126 self.assert_checksum_valid(received_packet, 'TCPerror')
1127 if received_packet.haslayer(UDPerror):
1128 self.assert_checksum_valid(received_packet, 'UDPerror',
1129 ignore_zero_checksum=True)
1130 if received_packet.haslayer(ICMPerror):
1131 self.assert_checksum_valid(received_packet, 'ICMPerror')
1133 def assert_icmp_checksum_valid(self, received_packet):
1134 self.assert_checksum_valid(received_packet, 'ICMP')
1135 self.assert_embedded_icmp_checksum_valid(received_packet)
1137 def assert_icmpv6_checksum_valid(self, pkt):
1138 if pkt.haslayer(ICMPv6DestUnreach):
1139 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
1140 self.assert_embedded_icmp_checksum_valid(pkt)
1141 if pkt.haslayer(ICMPv6EchoRequest):
1142 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
1143 if pkt.haslayer(ICMPv6EchoReply):
1144 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
1146 def get_packet_counter(self, counter):
1147 if counter.startswith("/"):
1148 counter_value = self.statistics.get_counter(counter)
1150 counters = self.vapi.cli("sh errors").split('\n')
1152 for i in range(1, len(counters) - 1):
1153 results = counters[i].split()
1154 if results[1] == counter:
1155 counter_value = int(results[0])
1157 return counter_value
1159 def assert_packet_counter_equal(self, counter, expected_value):
1160 counter_value = self.get_packet_counter(counter)
1161 self.assert_equal(counter_value, expected_value,
1162 "packet counter `%s'" % counter)
1164 def assert_error_counter_equal(self, counter, expected_value):
1165 counter_value = self.statistics.get_err_counter(counter)
1166 self.assert_equal(counter_value, expected_value,
1167 "error counter `%s'" % counter)
1170 def sleep(cls, timeout, remark=None):
1172 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1173 # * by Guido, only the main thread can be interrupted.
1175 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1178 if hasattr(os, 'sched_yield'):
1184 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1185 before = time.time()
1188 if after - before > 2 * timeout:
1189 cls.logger.error("unexpected self.sleep() result - "
1190 "slept for %es instead of ~%es!",
1191 after - before, timeout)
1194 "Finished sleep (%s) - slept %es (wanted %es)",
1195 remark, after - before, timeout)
1197 def pg_send(self, intf, pkts, worker=None, trace=True):
1198 intf.add_stream(pkts, worker=worker)
1199 self.pg_enable_capture(self.pg_interfaces)
1200 self.pg_start(trace=trace)
1202 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
1203 self.pg_send(intf, pkts)
1206 for i in self.pg_interfaces:
1207 i.get_capture(0, timeout=timeout)
1208 i.assert_nothing_captured(remark=remark)
1211 def send_and_expect(self, intf, pkts, output, n_rx=None, worker=None,
1215 self.pg_send(intf, pkts, worker=worker, trace=trace)
1216 rx = output.get_capture(n_rx)
1219 def send_and_expect_only(self, intf, pkts, output, timeout=None):
1220 self.pg_send(intf, pkts)
1221 rx = output.get_capture(len(pkts))
1225 for i in self.pg_interfaces:
1226 if i not in outputs:
1227 i.get_capture(0, timeout=timeout)
1228 i.assert_nothing_captured()
1234 def get_testcase_doc_name(test):
1235 return getdoc(test.__class__).splitlines()[0]
1238 def get_test_description(descriptions, test):
1239 short_description = test.shortDescription()
1240 if descriptions and short_description:
1241 return short_description
1246 class TestCaseInfo(object):
1247 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1248 self.logger = logger
1249 self.tempdir = tempdir
1250 self.vpp_pid = vpp_pid
1251 self.vpp_bin_path = vpp_bin_path
1252 self.core_crash_test = None
1255 class VppTestResult(unittest.TestResult):
1257 @property result_string
1258 String variable to store the test case result string.
1260 List variable containing 2-tuples of TestCase instances and strings
1261 holding formatted tracebacks. Each tuple represents a test which
1262 raised an unexpected exception.
1264 List variable containing 2-tuples of TestCase instances and strings
1265 holding formatted tracebacks. Each tuple represents a test where
1266 a failure was explicitly signalled using the TestCase.assert*()
1270 failed_test_cases_info = set()
1271 core_crash_test_cases_info = set()
1272 current_test_case_info = None
1274 def __init__(self, stream=None, descriptions=None, verbosity=None,
1277 :param stream File descriptor to store where to report test results.
1278 Set to the standard error stream by default.
1279 :param descriptions Boolean variable to store information if to use
1280 test case descriptions.
1281 :param verbosity Integer variable to store required verbosity level.
1283 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1284 self.stream = stream
1285 self.descriptions = descriptions
1286 self.verbosity = verbosity
1287 self.result_string = None
1288 self.runner = runner
1290 def addSuccess(self, test):
1292 Record a test succeeded result
1297 if self.current_test_case_info:
1298 self.current_test_case_info.logger.debug(
1299 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1300 test._testMethodName,
1301 test._testMethodDoc))
1302 unittest.TestResult.addSuccess(self, test)
1303 self.result_string = colorize("OK", GREEN)
1305 self.send_result_through_pipe(test, PASS)
1307 def addSkip(self, test, reason):
1309 Record a test skipped.
1315 if self.current_test_case_info:
1316 self.current_test_case_info.logger.debug(
1317 "--- addSkip() %s.%s(%s) called, reason is %s" %
1318 (test.__class__.__name__, test._testMethodName,
1319 test._testMethodDoc, reason))
1320 unittest.TestResult.addSkip(self, test, reason)
1321 self.result_string = colorize("SKIP", YELLOW)
1323 self.send_result_through_pipe(test, SKIP)
1325 def symlink_failed(self):
1326 if self.current_test_case_info:
1328 failed_dir = os.getenv('FAILED_DIR')
1329 link_path = os.path.join(
1332 os.path.basename(self.current_test_case_info.tempdir))
1334 self.current_test_case_info.logger.debug(
1335 "creating a link to the failed test")
1336 self.current_test_case_info.logger.debug(
1337 "os.symlink(%s, %s)" %
1338 (self.current_test_case_info.tempdir, link_path))
1339 if os.path.exists(link_path):
1340 self.current_test_case_info.logger.debug(
1341 'symlink already exists')
1343 os.symlink(self.current_test_case_info.tempdir, link_path)
1345 except Exception as e:
1346 self.current_test_case_info.logger.error(e)
1348 def send_result_through_pipe(self, test, result):
1349 if hasattr(self, 'test_framework_result_pipe'):
1350 pipe = self.test_framework_result_pipe
1352 pipe.send((test.id(), result))
1354 def log_error(self, test, err, fn_name):
1355 if self.current_test_case_info:
1356 if isinstance(test, unittest.suite._ErrorHolder):
1357 test_name = test.description
1359 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1360 test._testMethodName,
1361 test._testMethodDoc)
1362 self.current_test_case_info.logger.debug(
1363 "--- %s() %s called, err is %s" %
1364 (fn_name, test_name, err))
1365 self.current_test_case_info.logger.debug(
1366 "formatted exception is:\n%s" %
1367 "".join(format_exception(*err)))
1369 def add_error(self, test, err, unittest_fn, error_type):
1370 if error_type == FAIL:
1371 self.log_error(test, err, 'addFailure')
1372 error_type_str = colorize("FAIL", RED)
1373 elif error_type == ERROR:
1374 self.log_error(test, err, 'addError')
1375 error_type_str = colorize("ERROR", RED)
1377 raise Exception('Error type %s cannot be used to record an '
1378 'error or a failure' % error_type)
1380 unittest_fn(self, test, err)
1381 if self.current_test_case_info:
1382 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1384 self.current_test_case_info.tempdir)
1385 self.symlink_failed()
1386 self.failed_test_cases_info.add(self.current_test_case_info)
1387 if is_core_present(self.current_test_case_info.tempdir):
1388 if not self.current_test_case_info.core_crash_test:
1389 if isinstance(test, unittest.suite._ErrorHolder):
1390 test_name = str(test)
1392 test_name = "'{!s}' ({!s})".format(
1393 get_testcase_doc_name(test), test.id())
1394 self.current_test_case_info.core_crash_test = test_name
1395 self.core_crash_test_cases_info.add(
1396 self.current_test_case_info)
1398 self.result_string = '%s [no temp dir]' % error_type_str
1400 self.send_result_through_pipe(test, error_type)
1402 def addFailure(self, test, err):
1404 Record a test failed result
1407 :param err: error message
1410 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1412 def addError(self, test, err):
1414 Record a test error result
1417 :param err: error message
1420 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1422 def getDescription(self, test):
1424 Get test description
1427 :returns: test description
1430 return get_test_description(self.descriptions, test)
1432 def startTest(self, test):
1440 def print_header(test):
1441 test_doc = getdoc(test)
1443 raise Exception("No doc string for test '%s'" % test.id())
1444 test_title = test_doc.splitlines()[0]
1445 test_title_colored = colorize(test_title, GREEN)
1446 if test.is_tagged_run_solo():
1447 # long live PEP-8 and 80 char width limitation...
1449 test_title_colored = colorize("SOLO RUN: " + test_title, c)
1451 # This block may overwrite the colorized title above,
1452 # but we want this to stand out and be fixed
1453 if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
1455 w = "FIXME with VPP workers: "
1456 test_title_colored = colorize(w + test_title, c)
1458 if not hasattr(test.__class__, '_header_printed'):
1459 print(double_line_delim)
1460 print(test_title_colored)
1461 print(double_line_delim)
1462 test.__class__._header_printed = True
1465 self.start_test = time.time()
1466 unittest.TestResult.startTest(self, test)
1467 if self.verbosity > 0:
1468 self.stream.writeln(
1469 "Starting " + self.getDescription(test) + " ...")
1470 self.stream.writeln(single_line_delim)
1472 def stopTest(self, test):
1474 Called when the given test has been run
1479 unittest.TestResult.stopTest(self, test)
1481 if self.verbosity > 0:
1482 self.stream.writeln(single_line_delim)
1483 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1484 self.result_string))
1485 self.stream.writeln(single_line_delim)
1487 self.stream.writeln("%-68s %4.2f %s" %
1488 (self.getDescription(test),
1489 time.time() - self.start_test,
1490 self.result_string))
1492 self.send_result_through_pipe(test, TEST_RUN)
1494 def printErrors(self):
1496 Print errors from running the test case
1498 if len(self.errors) > 0 or len(self.failures) > 0:
1499 self.stream.writeln()
1500 self.printErrorList('ERROR', self.errors)
1501 self.printErrorList('FAIL', self.failures)
1503 # ^^ that is the last output from unittest before summary
1504 if not self.runner.print_summary:
1505 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1506 self.stream = devnull
1507 self.runner.stream = devnull
1509 def printErrorList(self, flavour, errors):
1511 Print error list to the output stream together with error type
1512 and test case description.
1514 :param flavour: error type
1515 :param errors: iterable errors
1518 for test, err in errors:
1519 self.stream.writeln(double_line_delim)
1520 self.stream.writeln("%s: %s" %
1521 (flavour, self.getDescription(test)))
1522 self.stream.writeln(single_line_delim)
1523 self.stream.writeln("%s" % err)
1526 class VppTestRunner(unittest.TextTestRunner):
1528 A basic test runner implementation which prints results to standard error.
1532 def resultclass(self):
1533 """Class maintaining the results of the tests"""
1534 return VppTestResult
1536 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1537 result_pipe=None, failfast=False, buffer=False,
1538 resultclass=None, print_summary=True, **kwargs):
1539 # ignore stream setting here, use hard-coded stdout to be in sync
1540 # with prints from VppTestCase methods ...
1541 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1542 verbosity, failfast, buffer,
1543 resultclass, **kwargs)
1544 KeepAliveReporter.pipe = keep_alive_pipe
1546 self.orig_stream = self.stream
1547 self.resultclass.test_framework_result_pipe = result_pipe
1549 self.print_summary = print_summary
1551 def _makeResult(self):
1552 return self.resultclass(self.stream,
1557 def run(self, test):
1564 faulthandler.enable() # emit stack trace to stderr if killed by signal
1566 result = super(VppTestRunner, self).run(test)
1567 if not self.print_summary:
1568 self.stream = self.orig_stream
1569 result.stream = self.orig_stream
1573 class Worker(Thread):
1574 def __init__(self, executable_args, logger, env=None, *args, **kwargs):
1575 super(Worker, self).__init__(*args, **kwargs)
1576 self.logger = logger
1577 self.args = executable_args
1578 if hasattr(self, 'testcase') and self.testcase.debug_all:
1579 if self.testcase.debug_gdbserver:
1580 self.args = ['/usr/bin/gdbserver', 'localhost:{port}'
1581 .format(port=self.testcase.gdbserver_port)] + args
1582 elif self.testcase.debug_gdb and hasattr(self, 'wait_for_gdb'):
1583 self.args.append(self.wait_for_gdb)
1584 self.app_bin = executable_args[0]
1585 self.app_name = os.path.basename(self.app_bin)
1586 if hasattr(self, 'role'):
1587 self.app_name += ' {role}'.format(role=self.role)
1590 env = {} if env is None else env
1591 self.env = copy.deepcopy(env)
1593 def wait_for_enter(self):
1594 if not hasattr(self, 'testcase'):
1596 if self.testcase.debug_all and self.testcase.debug_gdbserver:
1598 print(double_line_delim)
1599 print("Spawned GDB Server for '{app}' with PID: {pid}"
1600 .format(app=self.app_name, pid=self.process.pid))
1601 elif self.testcase.debug_all and self.testcase.debug_gdb:
1603 print(double_line_delim)
1604 print("Spawned '{app}' with PID: {pid}"
1605 .format(app=self.app_name, pid=self.process.pid))
1608 print(single_line_delim)
1609 print("You can debug '{app}' using:".format(app=self.app_name))
1610 if self.testcase.debug_gdbserver:
1611 print("sudo gdb " + self.app_bin +
1612 " -ex 'target remote localhost:{port}'"
1613 .format(port=self.testcase.gdbserver_port))
1614 print("Now is the time to attach gdb by running the above "
1615 "command, set up breakpoints etc., then resume from "
1616 "within gdb by issuing the 'continue' command")
1617 self.testcase.gdbserver_port += 1
1618 elif self.testcase.debug_gdb:
1619 print("sudo gdb " + self.app_bin +
1620 " -ex 'attach {pid}'".format(pid=self.process.pid))
1621 print("Now is the time to attach gdb by running the above "
1622 "command and set up breakpoints etc., then resume from"
1623 " within gdb by issuing the 'continue' command")
1624 print(single_line_delim)
1625 input("Press ENTER to continue running the testcase...")
1628 executable = self.args[0]
1629 if not os.path.exists(executable) or not os.access(
1630 executable, os.F_OK | os.X_OK):
1631 # Exit code that means some system file did not exist,
1632 # could not be opened, or had some other kind of error.
1633 self.result = os.EX_OSFILE
1634 raise EnvironmentError(
1635 "executable '%s' is not found or executable." % executable)
1636 self.logger.debug("Running executable: '{app}'"
1637 .format(app=' '.join(self.args)))
1638 env = os.environ.copy()
1639 env.update(self.env)
1640 env["CK_LOG_FILE_NAME"] = "-"
1641 self.process = subprocess.Popen(
1642 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1643 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1644 self.wait_for_enter()
1645 out, err = self.process.communicate()
1646 self.logger.debug("Finished running `{app}'".format(app=self.app_name))
1647 self.logger.info("Return code is `%s'" % self.process.returncode)
1648 self.logger.info(single_line_delim)
1649 self.logger.info("Executable `{app}' wrote to stdout:"
1650 .format(app=self.app_name))
1651 self.logger.info(single_line_delim)
1652 self.logger.info(out.decode('utf-8'))
1653 self.logger.info(single_line_delim)
1654 self.logger.info("Executable `{app}' wrote to stderr:"
1655 .format(app=self.app_name))
1656 self.logger.info(single_line_delim)
1657 self.logger.info(err.decode('utf-8'))
1658 self.logger.info(single_line_delim)
1659 self.result = self.process.returncode
1662 if __name__ == '__main__':