3 from __future__ import print_function
19 from collections import deque
20 from threading import Thread, Event
21 from inspect import getdoc, isclass
22 from traceback import format_exception
23 from logging import FileHandler, DEBUG, Formatter
27 from scapy.packet import Raw
28 import hook as hookmodule
29 from vpp_pg_interface import VppPGInterface
30 from vpp_sub_interface import VppSubInterface
31 from vpp_lo_interface import VppLoInterface
32 from vpp_bvi_interface import VppBviInterface
33 from vpp_papi_provider import VppPapiProvider
35 from vpp_papi.vpp_stats import VPPStats
36 from vpp_papi.vpp_transport_shmem import VppTransportShmemIOError
37 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
39 from vpp_object import VppObjectRegistry
40 from util import ppp, is_core_present
41 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
42 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
43 from scapy.layers.inet6 import ICMPv6EchoReply
45 logger = logging.getLogger(__name__)
47 # Set up an empty logger for the testcase that can be overridden as necessary
48 null_logger = logging.getLogger('VppTestCase')
49 null_logger.addHandler(logging.NullHandler())
58 class BoolEnvironmentVariable(object):
60 def __init__(self, env_var_name, default='n', true_values=None):
61 self.name = env_var_name
62 self.default = default
63 self.true_values = true_values if true_values is not None else \
67 return os.getenv(self.name, self.default).lower() in self.true_values
69 if sys.version_info[0] == 2:
70 __nonzero__ = __bool__
73 return 'BoolEnvironmentVariable(%r, default=%r, true_values=%r)' % \
74 (self.name, self.default, self.true_values)
77 debug_framework = BoolEnvironmentVariable('TEST_DEBUG')
82 Test framework module.
84 The module provides a set of tools for constructing and running tests and
85 representing the results.
89 class VppDiedError(Exception):
90 """ exception for reporting that the subprocess has died."""
92 signals_by_value = {v: k for k, v in signal.__dict__.items() if
93 k.startswith('SIG') and not k.startswith('SIG_')}
95 def __init__(self, rv=None, testcase=None, method_name=None):
97 self.signal_name = None
98 self.testcase = testcase
99 self.method_name = method_name
102 self.signal_name = VppDiedError.signals_by_value[-rv]
103 except (KeyError, TypeError):
106 if testcase is None and method_name is None:
109 in_msg = 'running %s.%s ' % (testcase, method_name)
111 msg = "VPP subprocess died %sunexpectedly with return code: %d%s." % (
114 ' [%s]' % (self.signal_name if
115 self.signal_name is not None else ''))
116 super(VppDiedError, self).__init__(msg)
119 class _PacketInfo(object):
120 """Private class to create packet info object.
122 Help process information about the next packet.
123 Set variables to default values.
125 #: Store the index of the packet.
127 #: Store the index of the source packet generator interface of the packet.
129 #: Store the index of the destination packet generator interface
132 #: Store expected ip version
134 #: Store expected upper protocol
136 #: Store the copy of the former packet.
139 def __eq__(self, other):
140 index = self.index == other.index
141 src = self.src == other.src
142 dst = self.dst == other.dst
143 data = self.data == other.data
144 return index and src and dst and data
147 def pump_output(testclass):
148 """ pump output from vpp stdout/stderr to proper queues """
151 while not testclass.pump_thread_stop_flag.is_set():
152 readable = select.select([testclass.vpp.stdout.fileno(),
153 testclass.vpp.stderr.fileno(),
154 testclass.pump_thread_wakeup_pipe[0]],
156 if testclass.vpp.stdout.fileno() in readable:
157 read = os.read(testclass.vpp.stdout.fileno(), 102400)
159 split = read.decode('ascii',
160 errors='backslashreplace').splitlines(True)
161 if len(stdout_fragment) > 0:
162 split[0] = "%s%s" % (stdout_fragment, split[0])
163 if len(split) > 0 and split[-1].endswith("\n"):
167 stdout_fragment = split[-1]
168 testclass.vpp_stdout_deque.extend(split[:limit])
169 if not testclass.cache_vpp_output:
170 for line in split[:limit]:
171 testclass.logger.info(
172 "VPP STDOUT: %s" % line.rstrip("\n"))
173 if testclass.vpp.stderr.fileno() in readable:
174 read = os.read(testclass.vpp.stderr.fileno(), 102400)
176 split = read.decode('ascii',
177 errors='backslashreplace').splitlines(True)
178 if len(stderr_fragment) > 0:
179 split[0] = "%s%s" % (stderr_fragment, split[0])
180 if len(split) > 0 and split[-1].endswith("\n"):
184 stderr_fragment = split[-1]
186 testclass.vpp_stderr_deque.extend(split[:limit])
187 if not testclass.cache_vpp_output:
188 for line in split[:limit]:
189 testclass.logger.error(
190 "VPP STDERR: %s" % line.rstrip("\n"))
191 # ignoring the dummy pipe here intentionally - the
192 # flag will take care of properly terminating the loop
195 def _is_skip_aarch64_set():
196 return BoolEnvironmentVariable('SKIP_AARCH64')
199 is_skip_aarch64_set = _is_skip_aarch64_set()
202 def _is_platform_aarch64():
203 return platform.machine() == 'aarch64'
206 is_platform_aarch64 = _is_platform_aarch64()
209 def _running_extended_tests():
210 return BoolEnvironmentVariable("EXTENDED_TESTS")
213 running_extended_tests = _running_extended_tests()
216 def _running_gcov_tests():
217 return BoolEnvironmentVariable("GCOV_TESTS")
220 running_gcov_tests = _running_gcov_tests()
223 class KeepAliveReporter(object):
225 Singleton object which reports test start to parent process
230 self.__dict__ = self._shared_state
238 def pipe(self, pipe):
239 if self._pipe is not None:
240 raise Exception("Internal error - pipe should only be set once.")
243 def send_keep_alive(self, test, desc=None):
245 Write current test tmpdir & desc to keep-alive pipe to signal liveness
247 if self.pipe is None:
248 # if not running forked..
252 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
256 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
259 class TestCaseTag(Enum):
260 # marks the suites that must run at the end
261 # using only a single test runner
263 # marks the suites broken on VPP multi-worker
264 FIXME_VPP_WORKERS = 2
267 def create_tag_decorator(e):
270 cls.test_tags.append(e)
271 except AttributeError:
277 tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
278 tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
281 class VppTestCase(unittest.TestCase):
282 """This subclass is a base class for VPP test cases that are implemented as
283 classes. It provides methods to create and run test case.
286 extra_vpp_punt_config = []
287 extra_vpp_plugin_config = []
289 vapi_response_timeout = 5
292 def packet_infos(self):
293 """List of packet infos"""
294 return self._packet_infos
297 def get_packet_count_for_if_idx(cls, dst_if_index):
298 """Get the number of packet info for specified destination if index"""
299 if dst_if_index in cls._packet_count_for_dst_if_idx:
300 return cls._packet_count_for_dst_if_idx[dst_if_index]
305 def has_tag(cls, tag):
306 """ if the test case has a given tag - return true """
308 return tag in cls.test_tags
309 except AttributeError:
314 def is_tagged_run_solo(cls):
315 """ if the test case class is timing-sensitive - return true """
316 return cls.has_tag(TestCaseTag.RUN_SOLO)
320 """Return the instance of this testcase"""
321 return cls.test_instance
324 def set_debug_flags(cls, d):
325 cls.gdbserver_port = 7777
326 cls.debug_core = False
327 cls.debug_gdb = False
328 cls.debug_gdbserver = False
329 cls.debug_all = False
334 cls.debug_core = True
335 elif dl == "gdb" or dl == "gdb-all":
337 elif dl == "gdbserver" or dl == "gdbserver-all":
338 cls.debug_gdbserver = True
340 raise Exception("Unrecognized DEBUG option: '%s'" % d)
341 if dl == "gdb-all" or dl == "gdbserver-all":
345 def get_least_used_cpu():
346 cpu_usage_list = [set(range(psutil.cpu_count()))]
347 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
348 if 'vpp_main' == p.info['name']]
349 for vpp_process in vpp_processes:
350 for cpu_usage_set in cpu_usage_list:
352 cpu_num = vpp_process.cpu_num()
353 if cpu_num in cpu_usage_set:
354 cpu_usage_set_index = cpu_usage_list.index(
356 if cpu_usage_set_index == len(cpu_usage_list) - 1:
357 cpu_usage_list.append({cpu_num})
359 cpu_usage_list[cpu_usage_set_index + 1].add(
361 cpu_usage_set.remove(cpu_num)
363 except psutil.NoSuchProcess:
366 for cpu_usage_set in cpu_usage_list:
367 if len(cpu_usage_set) > 0:
368 min_usage_set = cpu_usage_set
371 return random.choice(tuple(min_usage_set))
374 def setUpConstants(cls):
375 """ Set-up the test case class based on environment variables """
376 cls.step = BoolEnvironmentVariable('STEP')
377 d = os.getenv("DEBUG", None)
378 # inverted case to handle '' == True
379 c = os.getenv("CACHE_OUTPUT", "1")
380 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
381 cls.set_debug_flags(d)
382 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
383 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
384 cls.test_plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
385 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
387 if cls.plugin_path is not None:
388 if cls.extern_plugin_path is not None:
389 plugin_path = "%s:%s" % (
390 cls.plugin_path, cls.extern_plugin_path)
392 plugin_path = cls.plugin_path
393 elif cls.extern_plugin_path is not None:
394 plugin_path = cls.extern_plugin_path
396 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
397 debug_cli = "cli-listen localhost:5002"
399 size = os.getenv("COREDUMP_SIZE")
401 coredump_size = "coredump-size %s" % size
402 if coredump_size is None:
403 coredump_size = "coredump-size unlimited"
405 cpu_core_number = cls.get_least_used_cpu()
406 if not hasattr(cls, "worker_config"):
407 cls.worker_config = os.getenv("VPP_WORKER_CONFIG", "")
408 if cls.worker_config != "":
409 if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
410 cls.worker_config = ""
412 default_variant = os.getenv("VARIANT")
413 if default_variant is not None:
414 default_variant = "defaults { %s 100 }" % default_variant
418 api_fuzzing = os.getenv("API_FUZZ")
419 if api_fuzzing is None:
422 cls.vpp_cmdline = [cls.vpp_bin, "unix",
423 "{", "nodaemon", debug_cli, "full-coredump",
424 coredump_size, "runtime-dir", cls.tempdir, "}",
425 "api-trace", "{", "on", "}", "api-segment", "{",
426 "prefix", cls.shm_prefix, "}", "cpu", "{",
427 "main-core", str(cpu_core_number),
428 cls.worker_config, "}",
429 "physmem", "{", "max-size", "32m", "}",
430 "statseg", "{", "socket-name", cls.stats_sock, "}",
431 "socksvr", "{", "socket-name", cls.api_sock, "}",
432 "node { ", default_variant, "}",
433 "api-fuzz {", api_fuzzing, "}",
435 "{", "plugin", "dpdk_plugin.so", "{", "disable",
436 "}", "plugin", "rdma_plugin.so", "{", "disable",
437 "}", "plugin", "lisp_unittest_plugin.so", "{",
439 "}", "plugin", "unittest_plugin.so", "{", "enable",
440 "}"] + cls.extra_vpp_plugin_config + ["}", ]
442 if cls.extra_vpp_punt_config is not None:
443 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
444 if plugin_path is not None:
445 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
446 if cls.test_plugin_path is not None:
447 cls.vpp_cmdline.extend(["test_plugin_path", cls.test_plugin_path])
449 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
450 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
453 def wait_for_enter(cls):
454 if cls.debug_gdbserver:
455 print(double_line_delim)
456 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
458 print(double_line_delim)
459 print("Spawned VPP with PID: %d" % cls.vpp.pid)
461 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
463 print(single_line_delim)
464 print("You can debug VPP using:")
465 if cls.debug_gdbserver:
466 print("sudo gdb " + cls.vpp_bin +
467 " -ex 'target remote localhost:{port}'"
468 .format(port=cls.gdbserver_port))
469 print("Now is the time to attach gdb by running the above "
470 "command, set up breakpoints etc., then resume VPP from "
471 "within gdb by issuing the 'continue' command")
472 cls.gdbserver_port += 1
474 print("sudo gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
475 print("Now is the time to attach gdb by running the above "
476 "command and set up breakpoints etc., then resume VPP from"
477 " within gdb by issuing the 'continue' command")
478 print(single_line_delim)
479 input("Press ENTER to continue running the testcase...")
483 cmdline = cls.vpp_cmdline
485 if cls.debug_gdbserver:
486 gdbserver = '/usr/bin/gdbserver'
487 if not os.path.isfile(gdbserver) or \
488 not os.access(gdbserver, os.X_OK):
489 raise Exception("gdbserver binary '%s' does not exist or is "
490 "not executable" % gdbserver)
492 cmdline = [gdbserver, 'localhost:{port}'
493 .format(port=cls.gdbserver_port)] + cls.vpp_cmdline
494 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
497 cls.vpp = subprocess.Popen(cmdline,
498 stdout=subprocess.PIPE,
499 stderr=subprocess.PIPE)
500 except subprocess.CalledProcessError as e:
501 cls.logger.critical("Subprocess returned with non-0 return code: ("
505 cls.logger.critical("Subprocess returned with OS error: "
506 "(%s) %s", e.errno, e.strerror)
508 except Exception as e:
509 cls.logger.exception("Subprocess returned unexpected from "
516 def wait_for_coredump(cls):
517 corefile = cls.tempdir + "/core"
518 if os.path.isfile(corefile):
519 cls.logger.error("Waiting for coredump to complete: %s", corefile)
520 curr_size = os.path.getsize(corefile)
521 deadline = time.time() + 60
523 while time.time() < deadline:
526 curr_size = os.path.getsize(corefile)
527 if size == curr_size:
531 cls.logger.error("Timed out waiting for coredump to complete:"
534 cls.logger.error("Coredump complete: %s, size %d",
540 Perform class setup before running the testcase
541 Remove shared memory files, start vpp and connect the vpp-api
543 super(VppTestCase, cls).setUpClass()
544 gc.collect() # run garbage collection first
545 cls.logger = get_logger(cls.__name__)
546 seed = os.environ["RND_SEED"]
548 if hasattr(cls, 'parallel_handler'):
549 cls.logger.addHandler(cls.parallel_handler)
550 cls.logger.propagate = False
552 cls.tempdir = tempfile.mkdtemp(
553 prefix='vpp-unittest-%s-' % cls.__name__)
554 cls.stats_sock = "%s/stats.sock" % cls.tempdir
555 cls.api_sock = "%s/api.sock" % cls.tempdir
556 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
557 cls.file_handler.setFormatter(
558 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
560 cls.file_handler.setLevel(DEBUG)
561 cls.logger.addHandler(cls.file_handler)
562 cls.logger.debug("--- setUpClass() for %s called ---" %
564 cls.shm_prefix = os.path.basename(cls.tempdir)
565 os.chdir(cls.tempdir)
566 cls.logger.info("Temporary dir is %s, shm prefix is %s",
567 cls.tempdir, cls.shm_prefix)
568 cls.logger.debug("Random seed is %s" % seed)
570 cls.reset_packet_infos()
574 cls.registry = VppObjectRegistry()
575 cls.vpp_startup_failed = False
576 cls.reporter = KeepAliveReporter()
577 # need to catch exceptions here because if we raise, then the cleanup
578 # doesn't get called and we might end with a zombie vpp
581 cls.reporter.send_keep_alive(cls, 'setUpClass')
582 VppTestResult.current_test_case_info = TestCaseInfo(
583 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
584 cls.vpp_stdout_deque = deque()
585 cls.vpp_stderr_deque = deque()
586 cls.pump_thread_stop_flag = Event()
587 cls.pump_thread_wakeup_pipe = os.pipe()
588 cls.pump_thread = Thread(target=pump_output, args=(cls,))
589 cls.pump_thread.daemon = True
590 cls.pump_thread.start()
591 if cls.debug_gdb or cls.debug_gdbserver:
592 cls.vapi_response_timeout = 0
593 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
594 cls.vapi_response_timeout)
596 hook = hookmodule.StepHook(cls)
598 hook = hookmodule.PollHook(cls)
599 cls.vapi.register_hook(hook)
600 cls.statistics = VPPStats(socketname=cls.stats_sock)
604 cls.vpp_startup_failed = True
606 "VPP died shortly after startup, check the"
607 " output to standard error for possible cause")
611 except vpp_papi.VPPIOError as e:
612 cls.logger.debug("Exception connecting to vapi: %s" % e)
613 cls.vapi.disconnect()
615 if cls.debug_gdbserver:
616 print(colorize("You're running VPP inside gdbserver but "
617 "VPP-API connection failed, did you forget "
618 "to 'continue' VPP from within gdb?", RED))
620 except vpp_papi.VPPRuntimeError as e:
621 cls.logger.debug("%s" % e)
624 except Exception as e:
625 cls.logger.debug("Exception connecting to VPP: %s" % e)
630 def _debug_quit(cls):
631 if (cls.debug_gdbserver or cls.debug_gdb):
635 if cls.vpp.returncode is None:
637 print(double_line_delim)
638 print("VPP or GDB server is still running")
639 print(single_line_delim)
640 input("When done debugging, press ENTER to kill the "
641 "process and finish running the testcase...")
642 except AttributeError:
648 Disconnect vpp-api, kill vpp and cleanup shared memory files
652 # first signal that we want to stop the pump thread, then wake it up
653 if hasattr(cls, 'pump_thread_stop_flag'):
654 cls.pump_thread_stop_flag.set()
655 if hasattr(cls, 'pump_thread_wakeup_pipe'):
656 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
657 if hasattr(cls, 'pump_thread'):
658 cls.logger.debug("Waiting for pump thread to stop")
659 cls.pump_thread.join()
660 if hasattr(cls, 'vpp_stderr_reader_thread'):
661 cls.logger.debug("Waiting for stderr pump to stop")
662 cls.vpp_stderr_reader_thread.join()
664 if hasattr(cls, 'vpp'):
665 if hasattr(cls, 'vapi'):
666 cls.logger.debug(cls.vapi.vpp.get_stats())
667 cls.logger.debug("Disconnecting class vapi client on %s",
669 cls.vapi.disconnect()
670 cls.logger.debug("Deleting class vapi attribute on %s",
674 if cls.vpp.returncode is None:
675 cls.wait_for_coredump()
676 cls.logger.debug("Sending TERM to vpp")
678 cls.logger.debug("Waiting for vpp to die")
679 cls.vpp.communicate()
680 cls.logger.debug("Deleting class vpp attribute on %s",
684 if cls.vpp_startup_failed:
685 stdout_log = cls.logger.info
686 stderr_log = cls.logger.critical
688 stdout_log = cls.logger.info
689 stderr_log = cls.logger.info
691 if hasattr(cls, 'vpp_stdout_deque'):
692 stdout_log(single_line_delim)
693 stdout_log('VPP output to stdout while running %s:', cls.__name__)
694 stdout_log(single_line_delim)
695 vpp_output = "".join(cls.vpp_stdout_deque)
696 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
698 stdout_log('\n%s', vpp_output)
699 stdout_log(single_line_delim)
701 if hasattr(cls, 'vpp_stderr_deque'):
702 stderr_log(single_line_delim)
703 stderr_log('VPP output to stderr while running %s:', cls.__name__)
704 stderr_log(single_line_delim)
705 vpp_output = "".join(cls.vpp_stderr_deque)
706 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
708 stderr_log('\n%s', vpp_output)
709 stderr_log(single_line_delim)
712 def tearDownClass(cls):
713 """ Perform final cleanup after running all tests in this test-case """
714 cls.logger.debug("--- tearDownClass() for %s called ---" %
716 cls.reporter.send_keep_alive(cls, 'tearDownClass')
718 cls.file_handler.close()
719 cls.reset_packet_infos()
721 debug_internal.on_tear_down_class(cls)
723 def show_commands_at_teardown(self):
724 """ Allow subclass specific teardown logging additions."""
725 self.logger.info("--- No test specific show commands provided. ---")
728 """ Show various debug prints after each test """
729 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
730 (self.__class__.__name__, self._testMethodName,
731 self._testMethodDoc))
734 if not self.vpp_dead:
735 self.logger.debug(self.vapi.cli("show trace max 1000"))
736 self.logger.info(self.vapi.ppcli("show interface"))
737 self.logger.info(self.vapi.ppcli("show hardware"))
738 self.logger.info(self.statistics.set_errors_str())
739 self.logger.info(self.vapi.ppcli("show run"))
740 self.logger.info(self.vapi.ppcli("show log"))
741 self.logger.info(self.vapi.ppcli("show bihash"))
742 self.logger.info("Logging testcase specific show commands.")
743 self.show_commands_at_teardown()
744 self.registry.remove_vpp_config(self.logger)
745 # Save/Dump VPP api trace log
746 m = self._testMethodName
747 api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
748 tmp_api_trace = "/tmp/%s" % api_trace
749 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
750 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
751 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
753 os.rename(tmp_api_trace, vpp_api_trace_log)
754 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
756 except VppTransportShmemIOError:
757 self.logger.debug("VppTransportShmemIOError: Vpp dead. "
758 "Cannot log show commands.")
761 self.registry.unregister_all(self.logger)
764 """ Clear trace before running each test"""
765 super(VppTestCase, self).setUp()
766 self.reporter.send_keep_alive(self)
769 raise VppDiedError(rv=None, testcase=self.__class__.__name__,
770 method_name=self._testMethodName)
771 self.sleep(.1, "during setUp")
772 self.vpp_stdout_deque.append(
773 "--- test setUp() for %s.%s(%s) starts here ---\n" %
774 (self.__class__.__name__, self._testMethodName,
775 self._testMethodDoc))
776 self.vpp_stderr_deque.append(
777 "--- test setUp() for %s.%s(%s) starts here ---\n" %
778 (self.__class__.__name__, self._testMethodName,
779 self._testMethodDoc))
780 self.vapi.cli("clear trace")
781 # store the test instance inside the test class - so that objects
782 # holding the class can access instance methods (like assertEqual)
783 type(self).test_instance = self
786 def pg_enable_capture(cls, interfaces=None):
788 Enable capture on packet-generator interfaces
790 :param interfaces: iterable interface indexes (if None,
791 use self.pg_interfaces)
794 if interfaces is None:
795 interfaces = cls.pg_interfaces
800 def register_capture(cls, cap_name):
801 """ Register a capture in the testclass """
802 # add to the list of captures with current timestamp
803 cls._captures.append((time.time(), cap_name))
806 def get_vpp_time(cls):
807 # processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT"
808 # returns float("2.190522")
809 timestr = cls.vapi.cli('show clock')
810 head, sep, tail = timestr.partition(',')
811 head, sep, tail = head.partition('Time now')
815 def sleep_on_vpp_time(cls, sec):
816 """ Sleep according to time in VPP world """
817 # On a busy system with many processes
818 # we might end up with VPP time being slower than real world
819 # So take that into account when waiting for VPP to do something
820 start_time = cls.get_vpp_time()
821 while cls.get_vpp_time() - start_time < sec:
826 """ Enable the PG, wait till it is done, then clean up """
827 cls.vapi.cli("trace add pg-input 1000")
828 cls.vapi.cli('packet-generator enable')
829 # PG, when starts, runs to completion -
830 # so let's avoid a race condition,
831 # and wait a little till it's done.
832 # Then clean it up - and then be gone.
833 deadline = time.time() + 300
834 while cls.vapi.cli('show packet-generator').find("Yes") != -1:
835 cls.sleep(0.01) # yield
836 if time.time() > deadline:
837 cls.logger.error("Timeout waiting for pg to stop")
839 for stamp, cap_name in cls._captures:
840 cls.vapi.cli('packet-generator delete %s' % cap_name)
844 def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
846 Create packet-generator interfaces.
848 :param interfaces: iterable indexes of the interfaces.
849 :returns: List of created interfaces.
854 intf = VppPGInterface(cls, i, gso, gso_size)
855 setattr(cls, intf.name, intf)
857 cls.pg_interfaces = result
861 def create_loopback_interfaces(cls, count):
863 Create loopback interfaces.
865 :param count: number of interfaces created.
866 :returns: List of created interfaces.
868 result = [VppLoInterface(cls) for i in range(count)]
870 setattr(cls, intf.name, intf)
871 cls.lo_interfaces = result
875 def create_bvi_interfaces(cls, count):
877 Create BVI interfaces.
879 :param count: number of interfaces created.
880 :returns: List of created interfaces.
882 result = [VppBviInterface(cls) for i in range(count)]
884 setattr(cls, intf.name, intf)
885 cls.bvi_interfaces = result
889 def extend_packet(packet, size, padding=' '):
891 Extend packet to given size by padding with spaces or custom padding
892 NOTE: Currently works only when Raw layer is present.
894 :param packet: packet
895 :param size: target size
896 :param padding: padding used to extend the payload
899 packet_len = len(packet) + 4
900 extend = size - packet_len
902 num = (extend // len(padding)) + 1
903 packet[Raw].load += (padding * num)[:extend].encode("ascii")
906 def reset_packet_infos(cls):
907 """ Reset the list of packet info objects and packet counts to zero """
908 cls._packet_infos = {}
909 cls._packet_count_for_dst_if_idx = {}
912 def create_packet_info(cls, src_if, dst_if):
914 Create packet info object containing the source and destination indexes
915 and add it to the testcase's packet info list
917 :param VppInterface src_if: source interface
918 :param VppInterface dst_if: destination interface
920 :returns: _PacketInfo object
924 info.index = len(cls._packet_infos)
925 info.src = src_if.sw_if_index
926 info.dst = dst_if.sw_if_index
927 if isinstance(dst_if, VppSubInterface):
928 dst_idx = dst_if.parent.sw_if_index
930 dst_idx = dst_if.sw_if_index
931 if dst_idx in cls._packet_count_for_dst_if_idx:
932 cls._packet_count_for_dst_if_idx[dst_idx] += 1
934 cls._packet_count_for_dst_if_idx[dst_idx] = 1
935 cls._packet_infos[info.index] = info
939 def info_to_payload(info):
941 Convert _PacketInfo object to packet payload
943 :param info: _PacketInfo object
945 :returns: string containing serialized data from packet info
947 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
951 def payload_to_info(payload, payload_field='load'):
953 Convert packet payload to _PacketInfo object
955 :param payload: packet payload
956 :type payload: <class 'scapy.packet.Raw'>
957 :param payload_field: packet fieldname of payload "load" for
958 <class 'scapy.packet.Raw'>
959 :type payload_field: str
960 :returns: _PacketInfo object containing de-serialized data from payload
963 numbers = getattr(payload, payload_field).split()
965 info.index = int(numbers[0])
966 info.src = int(numbers[1])
967 info.dst = int(numbers[2])
968 info.ip = int(numbers[3])
969 info.proto = int(numbers[4])
972 def get_next_packet_info(self, info):
974 Iterate over the packet info list stored in the testcase
975 Start iteration with first element if info is None
976 Continue based on index in info if info is specified
978 :param info: info or None
979 :returns: next info in list or None if no more infos
984 next_index = info.index + 1
985 if next_index == len(self._packet_infos):
988 return self._packet_infos[next_index]
990 def get_next_packet_info_for_interface(self, src_index, info):
992 Search the packet info list for the next packet info with same source
995 :param src_index: source interface index to search for
996 :param info: packet info - where to start the search
997 :returns: packet info or None
1001 info = self.get_next_packet_info(info)
1004 if info.src == src_index:
1007 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
1009 Search the packet info list for the next packet info with same source
1010 and destination interface indexes
1012 :param src_index: source interface index to search for
1013 :param dst_index: destination interface index to search for
1014 :param info: packet info - where to start the search
1015 :returns: packet info or None
1019 info = self.get_next_packet_info_for_interface(src_index, info)
1022 if info.dst == dst_index:
1025 def assert_equal(self, real_value, expected_value, name_or_class=None):
1026 if name_or_class is None:
1027 self.assertEqual(real_value, expected_value)
1030 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
1031 msg = msg % (getdoc(name_or_class).strip(),
1032 real_value, str(name_or_class(real_value)),
1033 expected_value, str(name_or_class(expected_value)))
1035 msg = "Invalid %s: %s does not match expected value %s" % (
1036 name_or_class, real_value, expected_value)
1038 self.assertEqual(real_value, expected_value, msg)
1040 def assert_in_range(self,
1048 msg = "Invalid %s: %s out of range <%s,%s>" % (
1049 name, real_value, expected_min, expected_max)
1050 self.assertTrue(expected_min <= real_value <= expected_max, msg)
1052 def assert_packet_checksums_valid(self, packet,
1053 ignore_zero_udp_checksums=True):
1054 received = packet.__class__(scapy.compat.raw(packet))
1055 udp_layers = ['UDP', 'UDPerror']
1056 checksum_fields = ['cksum', 'chksum']
1059 temp = received.__class__(scapy.compat.raw(received))
1061 layer = temp.getlayer(counter)
1063 layer = layer.copy()
1064 layer.remove_payload()
1065 for cf in checksum_fields:
1066 if hasattr(layer, cf):
1067 if ignore_zero_udp_checksums and \
1068 0 == getattr(layer, cf) and \
1069 layer.name in udp_layers:
1071 delattr(temp.getlayer(counter), cf)
1072 checksums.append((counter, cf))
1075 counter = counter + 1
1076 if 0 == len(checksums):
1078 temp = temp.__class__(scapy.compat.raw(temp))
1079 for layer, cf in checksums:
1080 calc_sum = getattr(temp[layer], cf)
1082 getattr(received[layer], cf), calc_sum,
1083 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
1085 "Checksum field `%s` on `%s` layer has correct value `%s`" %
1086 (cf, temp[layer].name, calc_sum))
1088 def assert_checksum_valid(self, received_packet, layer,
1089 field_name='chksum',
1090 ignore_zero_checksum=False):
1091 """ Check checksum of received packet on given layer """
1092 received_packet_checksum = getattr(received_packet[layer], field_name)
1093 if ignore_zero_checksum and 0 == received_packet_checksum:
1095 recalculated = received_packet.__class__(
1096 scapy.compat.raw(received_packet))
1097 delattr(recalculated[layer], field_name)
1098 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1099 self.assert_equal(received_packet_checksum,
1100 getattr(recalculated[layer], field_name),
1101 "packet checksum on layer: %s" % layer)
1103 def assert_ip_checksum_valid(self, received_packet,
1104 ignore_zero_checksum=False):
1105 self.assert_checksum_valid(received_packet, 'IP',
1106 ignore_zero_checksum=ignore_zero_checksum)
1108 def assert_tcp_checksum_valid(self, received_packet,
1109 ignore_zero_checksum=False):
1110 self.assert_checksum_valid(received_packet, 'TCP',
1111 ignore_zero_checksum=ignore_zero_checksum)
1113 def assert_udp_checksum_valid(self, received_packet,
1114 ignore_zero_checksum=True):
1115 self.assert_checksum_valid(received_packet, 'UDP',
1116 ignore_zero_checksum=ignore_zero_checksum)
1118 def assert_embedded_icmp_checksum_valid(self, received_packet):
1119 if received_packet.haslayer(IPerror):
1120 self.assert_checksum_valid(received_packet, 'IPerror')
1121 if received_packet.haslayer(TCPerror):
1122 self.assert_checksum_valid(received_packet, 'TCPerror')
1123 if received_packet.haslayer(UDPerror):
1124 self.assert_checksum_valid(received_packet, 'UDPerror',
1125 ignore_zero_checksum=True)
1126 if received_packet.haslayer(ICMPerror):
1127 self.assert_checksum_valid(received_packet, 'ICMPerror')
1129 def assert_icmp_checksum_valid(self, received_packet):
1130 self.assert_checksum_valid(received_packet, 'ICMP')
1131 self.assert_embedded_icmp_checksum_valid(received_packet)
1133 def assert_icmpv6_checksum_valid(self, pkt):
1134 if pkt.haslayer(ICMPv6DestUnreach):
1135 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
1136 self.assert_embedded_icmp_checksum_valid(pkt)
1137 if pkt.haslayer(ICMPv6EchoRequest):
1138 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
1139 if pkt.haslayer(ICMPv6EchoReply):
1140 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
1142 def get_packet_counter(self, counter):
1143 if counter.startswith("/"):
1144 counter_value = self.statistics.get_counter(counter)
1146 counters = self.vapi.cli("sh errors").split('\n')
1148 for i in range(1, len(counters) - 1):
1149 results = counters[i].split()
1150 if results[1] == counter:
1151 counter_value = int(results[0])
1153 return counter_value
1155 def assert_packet_counter_equal(self, counter, expected_value):
1156 counter_value = self.get_packet_counter(counter)
1157 self.assert_equal(counter_value, expected_value,
1158 "packet counter `%s'" % counter)
1160 def assert_error_counter_equal(self, counter, expected_value):
1161 counter_value = self.statistics.get_err_counter(counter)
1162 self.assert_equal(counter_value, expected_value,
1163 "error counter `%s'" % counter)
1166 def sleep(cls, timeout, remark=None):
1168 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1169 # * by Guido, only the main thread can be interrupted.
1171 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1174 if hasattr(os, 'sched_yield'):
1180 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1181 before = time.time()
1184 if after - before > 2 * timeout:
1185 cls.logger.error("unexpected self.sleep() result - "
1186 "slept for %es instead of ~%es!",
1187 after - before, timeout)
1190 "Finished sleep (%s) - slept %es (wanted %es)",
1191 remark, after - before, timeout)
1193 def pg_send(self, intf, pkts, worker=None):
1194 self.vapi.cli("clear trace")
1195 intf.add_stream(pkts, worker=worker)
1196 self.pg_enable_capture(self.pg_interfaces)
1199 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
1200 self.pg_send(intf, pkts)
1203 for i in self.pg_interfaces:
1204 i.get_capture(0, timeout=timeout)
1205 i.assert_nothing_captured(remark=remark)
1208 def send_and_expect(self, intf, pkts, output, n_rx=None, worker=None):
1211 self.pg_send(intf, pkts, worker=worker)
1212 rx = output.get_capture(n_rx)
1215 def send_and_expect_only(self, intf, pkts, output, timeout=None):
1216 self.pg_send(intf, pkts)
1217 rx = output.get_capture(len(pkts))
1221 for i in self.pg_interfaces:
1222 if i not in outputs:
1223 i.get_capture(0, timeout=timeout)
1224 i.assert_nothing_captured()
1230 def get_testcase_doc_name(test):
1231 return getdoc(test.__class__).splitlines()[0]
1234 def get_test_description(descriptions, test):
1235 short_description = test.shortDescription()
1236 if descriptions and short_description:
1237 return short_description
1242 class TestCaseInfo(object):
1243 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1244 self.logger = logger
1245 self.tempdir = tempdir
1246 self.vpp_pid = vpp_pid
1247 self.vpp_bin_path = vpp_bin_path
1248 self.core_crash_test = None
1251 class VppTestResult(unittest.TestResult):
1253 @property result_string
1254 String variable to store the test case result string.
1256 List variable containing 2-tuples of TestCase instances and strings
1257 holding formatted tracebacks. Each tuple represents a test which
1258 raised an unexpected exception.
1260 List variable containing 2-tuples of TestCase instances and strings
1261 holding formatted tracebacks. Each tuple represents a test where
1262 a failure was explicitly signalled using the TestCase.assert*()
1266 failed_test_cases_info = set()
1267 core_crash_test_cases_info = set()
1268 current_test_case_info = None
1270 def __init__(self, stream=None, descriptions=None, verbosity=None,
1273 :param stream File descriptor to store where to report test results.
1274 Set to the standard error stream by default.
1275 :param descriptions Boolean variable to store information if to use
1276 test case descriptions.
1277 :param verbosity Integer variable to store required verbosity level.
1279 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1280 self.stream = stream
1281 self.descriptions = descriptions
1282 self.verbosity = verbosity
1283 self.result_string = None
1284 self.runner = runner
1286 def addSuccess(self, test):
1288 Record a test succeeded result
1293 if self.current_test_case_info:
1294 self.current_test_case_info.logger.debug(
1295 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1296 test._testMethodName,
1297 test._testMethodDoc))
1298 unittest.TestResult.addSuccess(self, test)
1299 self.result_string = colorize("OK", GREEN)
1301 self.send_result_through_pipe(test, PASS)
1303 def addSkip(self, test, reason):
1305 Record a test skipped.
1311 if self.current_test_case_info:
1312 self.current_test_case_info.logger.debug(
1313 "--- addSkip() %s.%s(%s) called, reason is %s" %
1314 (test.__class__.__name__, test._testMethodName,
1315 test._testMethodDoc, reason))
1316 unittest.TestResult.addSkip(self, test, reason)
1317 self.result_string = colorize("SKIP", YELLOW)
1319 self.send_result_through_pipe(test, SKIP)
1321 def symlink_failed(self):
1322 if self.current_test_case_info:
1324 failed_dir = os.getenv('FAILED_DIR')
1325 link_path = os.path.join(
1328 os.path.basename(self.current_test_case_info.tempdir))
1330 self.current_test_case_info.logger.debug(
1331 "creating a link to the failed test")
1332 self.current_test_case_info.logger.debug(
1333 "os.symlink(%s, %s)" %
1334 (self.current_test_case_info.tempdir, link_path))
1335 if os.path.exists(link_path):
1336 self.current_test_case_info.logger.debug(
1337 'symlink already exists')
1339 os.symlink(self.current_test_case_info.tempdir, link_path)
1341 except Exception as e:
1342 self.current_test_case_info.logger.error(e)
1344 def send_result_through_pipe(self, test, result):
1345 if hasattr(self, 'test_framework_result_pipe'):
1346 pipe = self.test_framework_result_pipe
1348 pipe.send((test.id(), result))
1350 def log_error(self, test, err, fn_name):
1351 if self.current_test_case_info:
1352 if isinstance(test, unittest.suite._ErrorHolder):
1353 test_name = test.description
1355 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1356 test._testMethodName,
1357 test._testMethodDoc)
1358 self.current_test_case_info.logger.debug(
1359 "--- %s() %s called, err is %s" %
1360 (fn_name, test_name, err))
1361 self.current_test_case_info.logger.debug(
1362 "formatted exception is:\n%s" %
1363 "".join(format_exception(*err)))
1365 def add_error(self, test, err, unittest_fn, error_type):
1366 if error_type == FAIL:
1367 self.log_error(test, err, 'addFailure')
1368 error_type_str = colorize("FAIL", RED)
1369 elif error_type == ERROR:
1370 self.log_error(test, err, 'addError')
1371 error_type_str = colorize("ERROR", RED)
1373 raise Exception('Error type %s cannot be used to record an '
1374 'error or a failure' % error_type)
1376 unittest_fn(self, test, err)
1377 if self.current_test_case_info:
1378 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1380 self.current_test_case_info.tempdir)
1381 self.symlink_failed()
1382 self.failed_test_cases_info.add(self.current_test_case_info)
1383 if is_core_present(self.current_test_case_info.tempdir):
1384 if not self.current_test_case_info.core_crash_test:
1385 if isinstance(test, unittest.suite._ErrorHolder):
1386 test_name = str(test)
1388 test_name = "'{!s}' ({!s})".format(
1389 get_testcase_doc_name(test), test.id())
1390 self.current_test_case_info.core_crash_test = test_name
1391 self.core_crash_test_cases_info.add(
1392 self.current_test_case_info)
1394 self.result_string = '%s [no temp dir]' % error_type_str
1396 self.send_result_through_pipe(test, error_type)
1398 def addFailure(self, test, err):
1400 Record a test failed result
1403 :param err: error message
1406 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1408 def addError(self, test, err):
1410 Record a test error result
1413 :param err: error message
1416 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1418 def getDescription(self, test):
1420 Get test description
1423 :returns: test description
1426 return get_test_description(self.descriptions, test)
1428 def startTest(self, test):
1436 def print_header(test):
1437 test_doc = getdoc(test)
1439 raise Exception("No doc string for test '%s'" % test.id())
1440 test_title = test_doc.splitlines()[0]
1441 test_title_colored = colorize(test_title, GREEN)
1442 if test.is_tagged_run_solo():
1443 # long live PEP-8 and 80 char width limitation...
1445 test_title_colored = colorize("SOLO RUN: " + test_title, c)
1447 # This block may overwrite the colorized title above,
1448 # but we want this to stand out and be fixed
1449 if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
1451 w = "FIXME with VPP workers: "
1452 test_title_colored = colorize(w + test_title, c)
1454 if not hasattr(test.__class__, '_header_printed'):
1455 print(double_line_delim)
1456 print(test_title_colored)
1457 print(double_line_delim)
1458 test.__class__._header_printed = True
1461 self.start_test = time.time()
1462 unittest.TestResult.startTest(self, test)
1463 if self.verbosity > 0:
1464 self.stream.writeln(
1465 "Starting " + self.getDescription(test) + " ...")
1466 self.stream.writeln(single_line_delim)
1468 def stopTest(self, test):
1470 Called when the given test has been run
1475 unittest.TestResult.stopTest(self, test)
1477 if self.verbosity > 0:
1478 self.stream.writeln(single_line_delim)
1479 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1480 self.result_string))
1481 self.stream.writeln(single_line_delim)
1483 self.stream.writeln("%-68s %4.2f %s" %
1484 (self.getDescription(test),
1485 time.time() - self.start_test,
1486 self.result_string))
1488 self.send_result_through_pipe(test, TEST_RUN)
1490 def printErrors(self):
1492 Print errors from running the test case
1494 if len(self.errors) > 0 or len(self.failures) > 0:
1495 self.stream.writeln()
1496 self.printErrorList('ERROR', self.errors)
1497 self.printErrorList('FAIL', self.failures)
1499 # ^^ that is the last output from unittest before summary
1500 if not self.runner.print_summary:
1501 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1502 self.stream = devnull
1503 self.runner.stream = devnull
1505 def printErrorList(self, flavour, errors):
1507 Print error list to the output stream together with error type
1508 and test case description.
1510 :param flavour: error type
1511 :param errors: iterable errors
1514 for test, err in errors:
1515 self.stream.writeln(double_line_delim)
1516 self.stream.writeln("%s: %s" %
1517 (flavour, self.getDescription(test)))
1518 self.stream.writeln(single_line_delim)
1519 self.stream.writeln("%s" % err)
1522 class VppTestRunner(unittest.TextTestRunner):
1524 A basic test runner implementation which prints results to standard error.
1528 def resultclass(self):
1529 """Class maintaining the results of the tests"""
1530 return VppTestResult
1532 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1533 result_pipe=None, failfast=False, buffer=False,
1534 resultclass=None, print_summary=True, **kwargs):
1535 # ignore stream setting here, use hard-coded stdout to be in sync
1536 # with prints from VppTestCase methods ...
1537 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1538 verbosity, failfast, buffer,
1539 resultclass, **kwargs)
1540 KeepAliveReporter.pipe = keep_alive_pipe
1542 self.orig_stream = self.stream
1543 self.resultclass.test_framework_result_pipe = result_pipe
1545 self.print_summary = print_summary
1547 def _makeResult(self):
1548 return self.resultclass(self.stream,
1553 def run(self, test):
1560 faulthandler.enable() # emit stack trace to stderr if killed by signal
1562 result = super(VppTestRunner, self).run(test)
1563 if not self.print_summary:
1564 self.stream = self.orig_stream
1565 result.stream = self.orig_stream
1569 class Worker(Thread):
1570 def __init__(self, executable_args, logger, env=None, *args, **kwargs):
1571 super(Worker, self).__init__(*args, **kwargs)
1572 self.logger = logger
1573 self.args = executable_args
1574 if hasattr(self, 'testcase') and self.testcase.debug_all:
1575 if self.testcase.debug_gdbserver:
1576 self.args = ['/usr/bin/gdbserver', 'localhost:{port}'
1577 .format(port=self.testcase.gdbserver_port)] + args
1578 elif self.testcase.debug_gdb and hasattr(self, 'wait_for_gdb'):
1579 self.args.append(self.wait_for_gdb)
1580 self.app_bin = executable_args[0]
1581 self.app_name = os.path.basename(self.app_bin)
1582 if hasattr(self, 'role'):
1583 self.app_name += ' {role}'.format(role=self.role)
1586 env = {} if env is None else env
1587 self.env = copy.deepcopy(env)
1589 def wait_for_enter(self):
1590 if not hasattr(self, 'testcase'):
1592 if self.testcase.debug_all and self.testcase.debug_gdbserver:
1594 print(double_line_delim)
1595 print("Spawned GDB Server for '{app}' with PID: {pid}"
1596 .format(app=self.app_name, pid=self.process.pid))
1597 elif self.testcase.debug_all and self.testcase.debug_gdb:
1599 print(double_line_delim)
1600 print("Spawned '{app}' with PID: {pid}"
1601 .format(app=self.app_name, pid=self.process.pid))
1604 print(single_line_delim)
1605 print("You can debug '{app}' using:".format(app=self.app_name))
1606 if self.testcase.debug_gdbserver:
1607 print("sudo gdb " + self.app_bin +
1608 " -ex 'target remote localhost:{port}'"
1609 .format(port=self.testcase.gdbserver_port))
1610 print("Now is the time to attach gdb by running the above "
1611 "command, set up breakpoints etc., then resume from "
1612 "within gdb by issuing the 'continue' command")
1613 self.testcase.gdbserver_port += 1
1614 elif self.testcase.debug_gdb:
1615 print("sudo gdb " + self.app_bin +
1616 " -ex 'attach {pid}'".format(pid=self.process.pid))
1617 print("Now is the time to attach gdb by running the above "
1618 "command and set up breakpoints etc., then resume from"
1619 " within gdb by issuing the 'continue' command")
1620 print(single_line_delim)
1621 input("Press ENTER to continue running the testcase...")
1624 executable = self.args[0]
1625 if not os.path.exists(executable) or not os.access(
1626 executable, os.F_OK | os.X_OK):
1627 # Exit code that means some system file did not exist,
1628 # could not be opened, or had some other kind of error.
1629 self.result = os.EX_OSFILE
1630 raise EnvironmentError(
1631 "executable '%s' is not found or executable." % executable)
1632 self.logger.debug("Running executable: '{app}'"
1633 .format(app=' '.join(self.args)))
1634 env = os.environ.copy()
1635 env.update(self.env)
1636 env["CK_LOG_FILE_NAME"] = "-"
1637 self.process = subprocess.Popen(
1638 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1639 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1640 self.wait_for_enter()
1641 out, err = self.process.communicate()
1642 self.logger.debug("Finished running `{app}'".format(app=self.app_name))
1643 self.logger.info("Return code is `%s'" % self.process.returncode)
1644 self.logger.info(single_line_delim)
1645 self.logger.info("Executable `{app}' wrote to stdout:"
1646 .format(app=self.app_name))
1647 self.logger.info(single_line_delim)
1648 self.logger.info(out.decode('utf-8'))
1649 self.logger.info(single_line_delim)
1650 self.logger.info("Executable `{app}' wrote to stderr:"
1651 .format(app=self.app_name))
1652 self.logger.info(single_line_delim)
1653 self.logger.info(err.decode('utf-8'))
1654 self.logger.info(single_line_delim)
1655 self.result = self.process.returncode
1658 if __name__ == '__main__':