3 from __future__ import print_function
19 from collections import deque
20 from threading import Thread, Event
21 from inspect import getdoc, isclass
22 from traceback import format_exception
23 from logging import FileHandler, DEBUG, Formatter
27 from scapy.packet import Raw
28 import hook as hookmodule
29 from vpp_pg_interface import VppPGInterface
30 from vpp_sub_interface import VppSubInterface
31 from vpp_lo_interface import VppLoInterface
32 from vpp_bvi_interface import VppBviInterface
33 from vpp_papi_provider import VppPapiProvider
35 from vpp_papi.vpp_stats import VPPStats
36 from vpp_papi.vpp_transport_shmem import VppTransportShmemIOError
37 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
39 from vpp_object import VppObjectRegistry
40 from util import ppp, is_core_present
41 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
42 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
43 from scapy.layers.inet6 import ICMPv6EchoReply
45 logger = logging.getLogger(__name__)
47 # Set up an empty logger for the testcase that can be overridden as necessary
48 null_logger = logging.getLogger('VppTestCase')
49 null_logger.addHandler(logging.NullHandler())
58 class BoolEnvironmentVariable(object):
60 def __init__(self, env_var_name, default='n', true_values=None):
61 self.name = env_var_name
62 self.default = default
63 self.true_values = true_values if true_values is not None else \
67 return os.getenv(self.name, self.default).lower() in self.true_values
69 if sys.version_info[0] == 2:
70 __nonzero__ = __bool__
73 return 'BoolEnvironmentVariable(%r, default=%r, true_values=%r)' % \
74 (self.name, self.default, self.true_values)
77 debug_framework = BoolEnvironmentVariable('TEST_DEBUG')
82 Test framework module.
84 The module provides a set of tools for constructing and running tests and
85 representing the results.
89 class VppDiedError(Exception):
90 """ exception for reporting that the subprocess has died."""
92 signals_by_value = {v: k for k, v in signal.__dict__.items() if
93 k.startswith('SIG') and not k.startswith('SIG_')}
95 def __init__(self, rv=None, testcase=None, method_name=None):
97 self.signal_name = None
98 self.testcase = testcase
99 self.method_name = method_name
102 self.signal_name = VppDiedError.signals_by_value[-rv]
103 except (KeyError, TypeError):
106 if testcase is None and method_name is None:
109 in_msg = 'running %s.%s ' % (testcase, method_name)
111 msg = "VPP subprocess died %sunexpectedly with return code: %d%s." % (
114 ' [%s]' % (self.signal_name if
115 self.signal_name is not None else ''))
116 super(VppDiedError, self).__init__(msg)
119 class _PacketInfo(object):
120 """Private class to create packet info object.
122 Help process information about the next packet.
123 Set variables to default values.
125 #: Store the index of the packet.
127 #: Store the index of the source packet generator interface of the packet.
129 #: Store the index of the destination packet generator interface
132 #: Store expected ip version
134 #: Store expected upper protocol
136 #: Store the copy of the former packet.
139 def __eq__(self, other):
140 index = self.index == other.index
141 src = self.src == other.src
142 dst = self.dst == other.dst
143 data = self.data == other.data
144 return index and src and dst and data
147 def pump_output(testclass):
148 """ pump output from vpp stdout/stderr to proper queues """
151 while not testclass.pump_thread_stop_flag.is_set():
152 readable = select.select([testclass.vpp.stdout.fileno(),
153 testclass.vpp.stderr.fileno(),
154 testclass.pump_thread_wakeup_pipe[0]],
156 if testclass.vpp.stdout.fileno() in readable:
157 read = os.read(testclass.vpp.stdout.fileno(), 102400)
159 split = read.decode('ascii',
160 errors='backslashreplace').splitlines(True)
161 if len(stdout_fragment) > 0:
162 split[0] = "%s%s" % (stdout_fragment, split[0])
163 if len(split) > 0 and split[-1].endswith("\n"):
167 stdout_fragment = split[-1]
168 testclass.vpp_stdout_deque.extend(split[:limit])
169 if not testclass.cache_vpp_output:
170 for line in split[:limit]:
171 testclass.logger.info(
172 "VPP STDOUT: %s" % line.rstrip("\n"))
173 if testclass.vpp.stderr.fileno() in readable:
174 read = os.read(testclass.vpp.stderr.fileno(), 102400)
176 split = read.decode('ascii',
177 errors='backslashreplace').splitlines(True)
178 if len(stderr_fragment) > 0:
179 split[0] = "%s%s" % (stderr_fragment, split[0])
180 if len(split) > 0 and split[-1].endswith("\n"):
184 stderr_fragment = split[-1]
186 testclass.vpp_stderr_deque.extend(split[:limit])
187 if not testclass.cache_vpp_output:
188 for line in split[:limit]:
189 testclass.logger.error(
190 "VPP STDERR: %s" % line.rstrip("\n"))
191 # ignoring the dummy pipe here intentionally - the
192 # flag will take care of properly terminating the loop
195 def _is_skip_aarch64_set():
196 return BoolEnvironmentVariable('SKIP_AARCH64')
199 is_skip_aarch64_set = _is_skip_aarch64_set()
202 def _is_platform_aarch64():
203 return platform.machine() == 'aarch64'
206 is_platform_aarch64 = _is_platform_aarch64()
209 def _running_extended_tests():
210 return BoolEnvironmentVariable("EXTENDED_TESTS")
213 running_extended_tests = _running_extended_tests()
216 def _running_gcov_tests():
217 return BoolEnvironmentVariable("GCOV_TESTS")
220 running_gcov_tests = _running_gcov_tests()
223 class KeepAliveReporter(object):
225 Singleton object which reports test start to parent process
230 self.__dict__ = self._shared_state
238 def pipe(self, pipe):
239 if self._pipe is not None:
240 raise Exception("Internal error - pipe should only be set once.")
243 def send_keep_alive(self, test, desc=None):
245 Write current test tmpdir & desc to keep-alive pipe to signal liveness
247 if self.pipe is None:
248 # if not running forked..
252 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
256 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
259 class TestCaseTag(Enum):
260 # marks the suites that must run at the end
261 # using only a single test runner
263 # marks the suites broken on VPP multi-worker
264 FIXME_VPP_WORKERS = 2
267 def create_tag_decorator(e):
270 cls.test_tags.append(e)
271 except AttributeError:
277 tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
278 tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
281 class VppTestCase(unittest.TestCase):
282 """This subclass is a base class for VPP test cases that are implemented as
283 classes. It provides methods to create and run test case.
286 extra_vpp_punt_config = []
287 extra_vpp_plugin_config = []
289 vapi_response_timeout = 5
292 def packet_infos(self):
293 """List of packet infos"""
294 return self._packet_infos
297 def get_packet_count_for_if_idx(cls, dst_if_index):
298 """Get the number of packet info for specified destination if index"""
299 if dst_if_index in cls._packet_count_for_dst_if_idx:
300 return cls._packet_count_for_dst_if_idx[dst_if_index]
305 def has_tag(cls, tag):
306 """ if the test case has a given tag - return true """
308 return tag in cls.test_tags
309 except AttributeError:
314 def is_tagged_run_solo(cls):
315 """ if the test case class is timing-sensitive - return true """
316 return cls.has_tag(TestCaseTag.RUN_SOLO)
320 """Return the instance of this testcase"""
321 return cls.test_instance
324 def set_debug_flags(cls, d):
325 cls.gdbserver_port = 7777
326 cls.debug_core = False
327 cls.debug_gdb = False
328 cls.debug_gdbserver = False
329 cls.debug_all = False
334 cls.debug_core = True
335 elif dl == "gdb" or dl == "gdb-all":
337 elif dl == "gdbserver" or dl == "gdbserver-all":
338 cls.debug_gdbserver = True
340 raise Exception("Unrecognized DEBUG option: '%s'" % d)
341 if dl == "gdb-all" or dl == "gdbserver-all":
345 def get_least_used_cpu():
346 cpu_usage_list = [set(range(psutil.cpu_count()))]
347 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
348 if 'vpp_main' == p.info['name']]
349 for vpp_process in vpp_processes:
350 for cpu_usage_set in cpu_usage_list:
352 cpu_num = vpp_process.cpu_num()
353 if cpu_num in cpu_usage_set:
354 cpu_usage_set_index = cpu_usage_list.index(
356 if cpu_usage_set_index == len(cpu_usage_list) - 1:
357 cpu_usage_list.append({cpu_num})
359 cpu_usage_list[cpu_usage_set_index + 1].add(
361 cpu_usage_set.remove(cpu_num)
363 except psutil.NoSuchProcess:
366 for cpu_usage_set in cpu_usage_list:
367 if len(cpu_usage_set) > 0:
368 min_usage_set = cpu_usage_set
371 return random.choice(tuple(min_usage_set))
374 def setUpConstants(cls):
375 """ Set-up the test case class based on environment variables """
376 cls.step = BoolEnvironmentVariable('STEP')
377 d = os.getenv("DEBUG", None)
378 # inverted case to handle '' == True
379 c = os.getenv("CACHE_OUTPUT", "1")
380 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
381 cls.set_debug_flags(d)
382 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
383 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
384 cls.test_plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
385 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
387 if cls.plugin_path is not None:
388 if cls.extern_plugin_path is not None:
389 plugin_path = "%s:%s" % (
390 cls.plugin_path, cls.extern_plugin_path)
392 plugin_path = cls.plugin_path
393 elif cls.extern_plugin_path is not None:
394 plugin_path = cls.extern_plugin_path
396 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
397 debug_cli = "cli-listen localhost:5002"
399 size = os.getenv("COREDUMP_SIZE")
401 coredump_size = "coredump-size %s" % size
402 if coredump_size is None:
403 coredump_size = "coredump-size unlimited"
405 cpu_core_number = cls.get_least_used_cpu()
406 if not hasattr(cls, "worker_config"):
407 cls.worker_config = os.getenv("VPP_WORKER_CONFIG", "")
408 if cls.worker_config != "":
409 if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
410 cls.worker_config = ""
412 default_variant = os.getenv("VARIANT")
413 if default_variant is not None:
414 default_variant = "defaults { %s 100 }" % default_variant
418 api_fuzzing = os.getenv("API_FUZZ")
419 if api_fuzzing is None:
422 cls.vpp_cmdline = [cls.vpp_bin, "unix",
423 "{", "nodaemon", debug_cli, "full-coredump",
424 coredump_size, "runtime-dir", cls.tempdir, "}",
425 "api-trace", "{", "on", "}", "api-segment", "{",
426 "prefix", cls.shm_prefix, "}", "cpu", "{",
427 "main-core", str(cpu_core_number),
428 cls.worker_config, "}",
429 "physmem", "{", "max-size", "32m", "}",
430 "statseg", "{", "socket-name", cls.stats_sock, "}",
431 "socksvr", "{", "socket-name", cls.api_sock, "}",
432 "node { ", default_variant, "}",
433 "api-fuzz {", api_fuzzing, "}",
435 "{", "plugin", "dpdk_plugin.so", "{", "disable",
436 "}", "plugin", "rdma_plugin.so", "{", "disable",
437 "}", "plugin", "lisp_unittest_plugin.so", "{",
439 "}", "plugin", "unittest_plugin.so", "{", "enable",
440 "}"] + cls.extra_vpp_plugin_config + ["}", ]
442 if cls.extra_vpp_punt_config is not None:
443 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
444 if plugin_path is not None:
445 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
446 if cls.test_plugin_path is not None:
447 cls.vpp_cmdline.extend(["test_plugin_path", cls.test_plugin_path])
449 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
450 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
453 def wait_for_enter(cls):
454 if cls.debug_gdbserver:
455 print(double_line_delim)
456 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
458 print(double_line_delim)
459 print("Spawned VPP with PID: %d" % cls.vpp.pid)
461 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
463 print(single_line_delim)
464 print("You can debug VPP using:")
465 if cls.debug_gdbserver:
466 print("sudo gdb " + cls.vpp_bin +
467 " -ex 'target remote localhost:{port}'"
468 .format(port=cls.gdbserver_port))
469 print("Now is the time to attach gdb by running the above "
470 "command, set up breakpoints etc., then resume VPP from "
471 "within gdb by issuing the 'continue' command")
472 cls.gdbserver_port += 1
474 print("sudo gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
475 print("Now is the time to attach gdb by running the above "
476 "command and set up breakpoints etc., then resume VPP from"
477 " within gdb by issuing the 'continue' command")
478 print(single_line_delim)
479 input("Press ENTER to continue running the testcase...")
483 cmdline = cls.vpp_cmdline
485 if cls.debug_gdbserver:
486 gdbserver = '/usr/bin/gdbserver'
487 if not os.path.isfile(gdbserver) or \
488 not os.access(gdbserver, os.X_OK):
489 raise Exception("gdbserver binary '%s' does not exist or is "
490 "not executable" % gdbserver)
492 cmdline = [gdbserver, 'localhost:{port}'
493 .format(port=cls.gdbserver_port)] + cls.vpp_cmdline
494 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
497 cls.vpp = subprocess.Popen(cmdline,
498 stdout=subprocess.PIPE,
499 stderr=subprocess.PIPE)
500 except subprocess.CalledProcessError as e:
501 cls.logger.critical("Subprocess returned with non-0 return code: ("
505 cls.logger.critical("Subprocess returned with OS error: "
506 "(%s) %s", e.errno, e.strerror)
508 except Exception as e:
509 cls.logger.exception("Subprocess returned unexpected from "
516 def wait_for_coredump(cls):
517 corefile = cls.tempdir + "/core"
518 if os.path.isfile(corefile):
519 cls.logger.error("Waiting for coredump to complete: %s", corefile)
520 curr_size = os.path.getsize(corefile)
521 deadline = time.time() + 60
523 while time.time() < deadline:
526 curr_size = os.path.getsize(corefile)
527 if size == curr_size:
531 cls.logger.error("Timed out waiting for coredump to complete:"
534 cls.logger.error("Coredump complete: %s, size %d",
540 Perform class setup before running the testcase
541 Remove shared memory files, start vpp and connect the vpp-api
543 super(VppTestCase, cls).setUpClass()
544 gc.collect() # run garbage collection first
545 cls.logger = get_logger(cls.__name__)
546 seed = os.environ["RND_SEED"]
548 if hasattr(cls, 'parallel_handler'):
549 cls.logger.addHandler(cls.parallel_handler)
550 cls.logger.propagate = False
552 cls.tempdir = tempfile.mkdtemp(
553 prefix='vpp-unittest-%s-' % cls.__name__)
554 cls.stats_sock = "%s/stats.sock" % cls.tempdir
555 cls.api_sock = "%s/api.sock" % cls.tempdir
556 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
557 cls.file_handler.setFormatter(
558 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
560 cls.file_handler.setLevel(DEBUG)
561 cls.logger.addHandler(cls.file_handler)
562 cls.logger.debug("--- setUpClass() for %s called ---" %
564 cls.shm_prefix = os.path.basename(cls.tempdir)
565 os.chdir(cls.tempdir)
566 cls.logger.info("Temporary dir is %s, shm prefix is %s",
567 cls.tempdir, cls.shm_prefix)
568 cls.logger.debug("Random seed is %s" % seed)
570 cls.reset_packet_infos()
574 cls.registry = VppObjectRegistry()
575 cls.vpp_startup_failed = False
576 cls.reporter = KeepAliveReporter()
577 # need to catch exceptions here because if we raise, then the cleanup
578 # doesn't get called and we might end with a zombie vpp
581 cls.reporter.send_keep_alive(cls, 'setUpClass')
582 VppTestResult.current_test_case_info = TestCaseInfo(
583 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
584 cls.vpp_stdout_deque = deque()
585 cls.vpp_stderr_deque = deque()
586 cls.pump_thread_stop_flag = Event()
587 cls.pump_thread_wakeup_pipe = os.pipe()
588 cls.pump_thread = Thread(target=pump_output, args=(cls,))
589 cls.pump_thread.daemon = True
590 cls.pump_thread.start()
591 if cls.debug_gdb or cls.debug_gdbserver:
592 cls.vapi_response_timeout = 0
593 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
594 cls.vapi_response_timeout)
596 hook = hookmodule.StepHook(cls)
598 hook = hookmodule.PollHook(cls)
599 cls.vapi.register_hook(hook)
600 cls.statistics = VPPStats(socketname=cls.stats_sock)
604 cls.vpp_startup_failed = True
606 "VPP died shortly after startup, check the"
607 " output to standard error for possible cause")
611 except vpp_papi.VPPIOError as e:
612 cls.logger.debug("Exception connecting to vapi: %s" % e)
613 cls.vapi.disconnect()
615 if cls.debug_gdbserver:
616 print(colorize("You're running VPP inside gdbserver but "
617 "VPP-API connection failed, did you forget "
618 "to 'continue' VPP from within gdb?", RED))
620 except vpp_papi.VPPRuntimeError as e:
621 cls.logger.debug("%s" % e)
624 except Exception as e:
625 cls.logger.debug("Exception connecting to VPP: %s" % e)
630 def _debug_quit(cls):
631 if (cls.debug_gdbserver or cls.debug_gdb):
635 if cls.vpp.returncode is None:
637 print(double_line_delim)
638 print("VPP or GDB server is still running")
639 print(single_line_delim)
640 input("When done debugging, press ENTER to kill the "
641 "process and finish running the testcase...")
642 except AttributeError:
648 Disconnect vpp-api, kill vpp and cleanup shared memory files
652 # first signal that we want to stop the pump thread, then wake it up
653 if hasattr(cls, 'pump_thread_stop_flag'):
654 cls.pump_thread_stop_flag.set()
655 if hasattr(cls, 'pump_thread_wakeup_pipe'):
656 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
657 if hasattr(cls, 'pump_thread'):
658 cls.logger.debug("Waiting for pump thread to stop")
659 cls.pump_thread.join()
660 if hasattr(cls, 'vpp_stderr_reader_thread'):
661 cls.logger.debug("Waiting for stderr pump to stop")
662 cls.vpp_stderr_reader_thread.join()
664 if hasattr(cls, 'vpp'):
665 if hasattr(cls, 'vapi'):
666 cls.logger.debug(cls.vapi.vpp.get_stats())
667 cls.logger.debug("Disconnecting class vapi client on %s",
669 cls.vapi.disconnect()
670 cls.logger.debug("Deleting class vapi attribute on %s",
674 if cls.vpp.returncode is None:
675 cls.wait_for_coredump()
676 cls.logger.debug("Sending TERM to vpp")
678 cls.logger.debug("Waiting for vpp to die")
679 cls.vpp.communicate()
680 cls.logger.debug("Deleting class vpp attribute on %s",
682 cls.vpp.stdout.close()
683 cls.vpp.stderr.close()
686 if cls.vpp_startup_failed:
687 stdout_log = cls.logger.info
688 stderr_log = cls.logger.critical
690 stdout_log = cls.logger.info
691 stderr_log = cls.logger.info
693 if hasattr(cls, 'vpp_stdout_deque'):
694 stdout_log(single_line_delim)
695 stdout_log('VPP output to stdout while running %s:', cls.__name__)
696 stdout_log(single_line_delim)
697 vpp_output = "".join(cls.vpp_stdout_deque)
698 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
700 stdout_log('\n%s', vpp_output)
701 stdout_log(single_line_delim)
703 if hasattr(cls, 'vpp_stderr_deque'):
704 stderr_log(single_line_delim)
705 stderr_log('VPP output to stderr while running %s:', cls.__name__)
706 stderr_log(single_line_delim)
707 vpp_output = "".join(cls.vpp_stderr_deque)
708 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
710 stderr_log('\n%s', vpp_output)
711 stderr_log(single_line_delim)
714 def tearDownClass(cls):
715 """ Perform final cleanup after running all tests in this test-case """
716 cls.logger.debug("--- tearDownClass() for %s called ---" %
718 cls.reporter.send_keep_alive(cls, 'tearDownClass')
720 cls.file_handler.close()
721 cls.reset_packet_infos()
723 debug_internal.on_tear_down_class(cls)
725 def show_commands_at_teardown(self):
726 """ Allow subclass specific teardown logging additions."""
727 self.logger.info("--- No test specific show commands provided. ---")
730 """ Show various debug prints after each test """
731 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
732 (self.__class__.__name__, self._testMethodName,
733 self._testMethodDoc))
736 if not self.vpp_dead:
737 self.logger.debug(self.vapi.cli("show trace max 1000"))
738 self.logger.info(self.vapi.ppcli("show interface"))
739 self.logger.info(self.vapi.ppcli("show hardware"))
740 self.logger.info(self.statistics.set_errors_str())
741 self.logger.info(self.vapi.ppcli("show run"))
742 self.logger.info(self.vapi.ppcli("show log"))
743 self.logger.info(self.vapi.ppcli("show bihash"))
744 self.logger.info("Logging testcase specific show commands.")
745 self.show_commands_at_teardown()
746 self.registry.remove_vpp_config(self.logger)
747 # Save/Dump VPP api trace log
748 m = self._testMethodName
749 api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
750 tmp_api_trace = "/tmp/%s" % api_trace
751 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
752 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
753 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
755 os.rename(tmp_api_trace, vpp_api_trace_log)
756 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
758 except VppTransportShmemIOError:
759 self.logger.debug("VppTransportShmemIOError: Vpp dead. "
760 "Cannot log show commands.")
763 self.registry.unregister_all(self.logger)
766 """ Clear trace before running each test"""
767 super(VppTestCase, self).setUp()
768 self.reporter.send_keep_alive(self)
771 raise VppDiedError(rv=None, testcase=self.__class__.__name__,
772 method_name=self._testMethodName)
773 self.sleep(.1, "during setUp")
774 self.vpp_stdout_deque.append(
775 "--- test setUp() for %s.%s(%s) starts here ---\n" %
776 (self.__class__.__name__, self._testMethodName,
777 self._testMethodDoc))
778 self.vpp_stderr_deque.append(
779 "--- test setUp() for %s.%s(%s) starts here ---\n" %
780 (self.__class__.__name__, self._testMethodName,
781 self._testMethodDoc))
782 self.vapi.cli("clear trace")
783 # store the test instance inside the test class - so that objects
784 # holding the class can access instance methods (like assertEqual)
785 type(self).test_instance = self
788 def pg_enable_capture(cls, interfaces=None):
790 Enable capture on packet-generator interfaces
792 :param interfaces: iterable interface indexes (if None,
793 use self.pg_interfaces)
796 if interfaces is None:
797 interfaces = cls.pg_interfaces
802 def register_capture(cls, cap_name):
803 """ Register a capture in the testclass """
804 # add to the list of captures with current timestamp
805 cls._captures.append((time.time(), cap_name))
808 def get_vpp_time(cls):
809 # processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT"
810 # returns float("2.190522")
811 timestr = cls.vapi.cli('show clock')
812 head, sep, tail = timestr.partition(',')
813 head, sep, tail = head.partition('Time now')
817 def sleep_on_vpp_time(cls, sec):
818 """ Sleep according to time in VPP world """
819 # On a busy system with many processes
820 # we might end up with VPP time being slower than real world
821 # So take that into account when waiting for VPP to do something
822 start_time = cls.get_vpp_time()
823 while cls.get_vpp_time() - start_time < sec:
828 """ Enable the PG, wait till it is done, then clean up """
829 cls.vapi.cli("trace add pg-input 1000")
830 cls.vapi.cli('packet-generator enable')
831 # PG, when starts, runs to completion -
832 # so let's avoid a race condition,
833 # and wait a little till it's done.
834 # Then clean it up - and then be gone.
835 deadline = time.time() + 300
836 while cls.vapi.cli('show packet-generator').find("Yes") != -1:
837 cls.sleep(0.01) # yield
838 if time.time() > deadline:
839 cls.logger.error("Timeout waiting for pg to stop")
841 for stamp, cap_name in cls._captures:
842 cls.vapi.cli('packet-generator delete %s' % cap_name)
846 def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
848 Create packet-generator interfaces.
850 :param interfaces: iterable indexes of the interfaces.
851 :returns: List of created interfaces.
856 intf = VppPGInterface(cls, i, gso, gso_size)
857 setattr(cls, intf.name, intf)
859 cls.pg_interfaces = result
863 def create_loopback_interfaces(cls, count):
865 Create loopback interfaces.
867 :param count: number of interfaces created.
868 :returns: List of created interfaces.
870 result = [VppLoInterface(cls) for i in range(count)]
872 setattr(cls, intf.name, intf)
873 cls.lo_interfaces = result
877 def create_bvi_interfaces(cls, count):
879 Create BVI interfaces.
881 :param count: number of interfaces created.
882 :returns: List of created interfaces.
884 result = [VppBviInterface(cls) for i in range(count)]
886 setattr(cls, intf.name, intf)
887 cls.bvi_interfaces = result
891 def extend_packet(packet, size, padding=' '):
893 Extend packet to given size by padding with spaces or custom padding
894 NOTE: Currently works only when Raw layer is present.
896 :param packet: packet
897 :param size: target size
898 :param padding: padding used to extend the payload
901 packet_len = len(packet) + 4
902 extend = size - packet_len
904 num = (extend // len(padding)) + 1
905 packet[Raw].load += (padding * num)[:extend].encode("ascii")
908 def reset_packet_infos(cls):
909 """ Reset the list of packet info objects and packet counts to zero """
910 cls._packet_infos = {}
911 cls._packet_count_for_dst_if_idx = {}
914 def create_packet_info(cls, src_if, dst_if):
916 Create packet info object containing the source and destination indexes
917 and add it to the testcase's packet info list
919 :param VppInterface src_if: source interface
920 :param VppInterface dst_if: destination interface
922 :returns: _PacketInfo object
926 info.index = len(cls._packet_infos)
927 info.src = src_if.sw_if_index
928 info.dst = dst_if.sw_if_index
929 if isinstance(dst_if, VppSubInterface):
930 dst_idx = dst_if.parent.sw_if_index
932 dst_idx = dst_if.sw_if_index
933 if dst_idx in cls._packet_count_for_dst_if_idx:
934 cls._packet_count_for_dst_if_idx[dst_idx] += 1
936 cls._packet_count_for_dst_if_idx[dst_idx] = 1
937 cls._packet_infos[info.index] = info
941 def info_to_payload(info):
943 Convert _PacketInfo object to packet payload
945 :param info: _PacketInfo object
947 :returns: string containing serialized data from packet info
949 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
953 def payload_to_info(payload, payload_field='load'):
955 Convert packet payload to _PacketInfo object
957 :param payload: packet payload
958 :type payload: <class 'scapy.packet.Raw'>
959 :param payload_field: packet fieldname of payload "load" for
960 <class 'scapy.packet.Raw'>
961 :type payload_field: str
962 :returns: _PacketInfo object containing de-serialized data from payload
965 numbers = getattr(payload, payload_field).split()
967 info.index = int(numbers[0])
968 info.src = int(numbers[1])
969 info.dst = int(numbers[2])
970 info.ip = int(numbers[3])
971 info.proto = int(numbers[4])
974 def get_next_packet_info(self, info):
976 Iterate over the packet info list stored in the testcase
977 Start iteration with first element if info is None
978 Continue based on index in info if info is specified
980 :param info: info or None
981 :returns: next info in list or None if no more infos
986 next_index = info.index + 1
987 if next_index == len(self._packet_infos):
990 return self._packet_infos[next_index]
992 def get_next_packet_info_for_interface(self, src_index, info):
994 Search the packet info list for the next packet info with same source
997 :param src_index: source interface index to search for
998 :param info: packet info - where to start the search
999 :returns: packet info or None
1003 info = self.get_next_packet_info(info)
1006 if info.src == src_index:
1009 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
1011 Search the packet info list for the next packet info with same source
1012 and destination interface indexes
1014 :param src_index: source interface index to search for
1015 :param dst_index: destination interface index to search for
1016 :param info: packet info - where to start the search
1017 :returns: packet info or None
1021 info = self.get_next_packet_info_for_interface(src_index, info)
1024 if info.dst == dst_index:
1027 def assert_equal(self, real_value, expected_value, name_or_class=None):
1028 if name_or_class is None:
1029 self.assertEqual(real_value, expected_value)
1032 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
1033 msg = msg % (getdoc(name_or_class).strip(),
1034 real_value, str(name_or_class(real_value)),
1035 expected_value, str(name_or_class(expected_value)))
1037 msg = "Invalid %s: %s does not match expected value %s" % (
1038 name_or_class, real_value, expected_value)
1040 self.assertEqual(real_value, expected_value, msg)
1042 def assert_in_range(self,
1050 msg = "Invalid %s: %s out of range <%s,%s>" % (
1051 name, real_value, expected_min, expected_max)
1052 self.assertTrue(expected_min <= real_value <= expected_max, msg)
1054 def assert_packet_checksums_valid(self, packet,
1055 ignore_zero_udp_checksums=True):
1056 received = packet.__class__(scapy.compat.raw(packet))
1057 udp_layers = ['UDP', 'UDPerror']
1058 checksum_fields = ['cksum', 'chksum']
1061 temp = received.__class__(scapy.compat.raw(received))
1063 layer = temp.getlayer(counter)
1065 layer = layer.copy()
1066 layer.remove_payload()
1067 for cf in checksum_fields:
1068 if hasattr(layer, cf):
1069 if ignore_zero_udp_checksums and \
1070 0 == getattr(layer, cf) and \
1071 layer.name in udp_layers:
1073 delattr(temp.getlayer(counter), cf)
1074 checksums.append((counter, cf))
1077 counter = counter + 1
1078 if 0 == len(checksums):
1080 temp = temp.__class__(scapy.compat.raw(temp))
1081 for layer, cf in checksums:
1082 calc_sum = getattr(temp[layer], cf)
1084 getattr(received[layer], cf), calc_sum,
1085 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
1087 "Checksum field `%s` on `%s` layer has correct value `%s`" %
1088 (cf, temp[layer].name, calc_sum))
1090 def assert_checksum_valid(self, received_packet, layer,
1091 field_name='chksum',
1092 ignore_zero_checksum=False):
1093 """ Check checksum of received packet on given layer """
1094 received_packet_checksum = getattr(received_packet[layer], field_name)
1095 if ignore_zero_checksum and 0 == received_packet_checksum:
1097 recalculated = received_packet.__class__(
1098 scapy.compat.raw(received_packet))
1099 delattr(recalculated[layer], field_name)
1100 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1101 self.assert_equal(received_packet_checksum,
1102 getattr(recalculated[layer], field_name),
1103 "packet checksum on layer: %s" % layer)
1105 def assert_ip_checksum_valid(self, received_packet,
1106 ignore_zero_checksum=False):
1107 self.assert_checksum_valid(received_packet, 'IP',
1108 ignore_zero_checksum=ignore_zero_checksum)
1110 def assert_tcp_checksum_valid(self, received_packet,
1111 ignore_zero_checksum=False):
1112 self.assert_checksum_valid(received_packet, 'TCP',
1113 ignore_zero_checksum=ignore_zero_checksum)
1115 def assert_udp_checksum_valid(self, received_packet,
1116 ignore_zero_checksum=True):
1117 self.assert_checksum_valid(received_packet, 'UDP',
1118 ignore_zero_checksum=ignore_zero_checksum)
1120 def assert_embedded_icmp_checksum_valid(self, received_packet):
1121 if received_packet.haslayer(IPerror):
1122 self.assert_checksum_valid(received_packet, 'IPerror')
1123 if received_packet.haslayer(TCPerror):
1124 self.assert_checksum_valid(received_packet, 'TCPerror')
1125 if received_packet.haslayer(UDPerror):
1126 self.assert_checksum_valid(received_packet, 'UDPerror',
1127 ignore_zero_checksum=True)
1128 if received_packet.haslayer(ICMPerror):
1129 self.assert_checksum_valid(received_packet, 'ICMPerror')
1131 def assert_icmp_checksum_valid(self, received_packet):
1132 self.assert_checksum_valid(received_packet, 'ICMP')
1133 self.assert_embedded_icmp_checksum_valid(received_packet)
1135 def assert_icmpv6_checksum_valid(self, pkt):
1136 if pkt.haslayer(ICMPv6DestUnreach):
1137 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
1138 self.assert_embedded_icmp_checksum_valid(pkt)
1139 if pkt.haslayer(ICMPv6EchoRequest):
1140 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
1141 if pkt.haslayer(ICMPv6EchoReply):
1142 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
1144 def get_packet_counter(self, counter):
1145 if counter.startswith("/"):
1146 counter_value = self.statistics.get_counter(counter)
1148 counters = self.vapi.cli("sh errors").split('\n')
1150 for i in range(1, len(counters) - 1):
1151 results = counters[i].split()
1152 if results[1] == counter:
1153 counter_value = int(results[0])
1155 return counter_value
1157 def assert_packet_counter_equal(self, counter, expected_value):
1158 counter_value = self.get_packet_counter(counter)
1159 self.assert_equal(counter_value, expected_value,
1160 "packet counter `%s'" % counter)
1162 def assert_error_counter_equal(self, counter, expected_value):
1163 counter_value = self.statistics.get_err_counter(counter)
1164 self.assert_equal(counter_value, expected_value,
1165 "error counter `%s'" % counter)
1168 def sleep(cls, timeout, remark=None):
1170 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1171 # * by Guido, only the main thread can be interrupted.
1173 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1176 if hasattr(os, 'sched_yield'):
1182 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1183 before = time.time()
1186 if after - before > 2 * timeout:
1187 cls.logger.error("unexpected self.sleep() result - "
1188 "slept for %es instead of ~%es!",
1189 after - before, timeout)
1192 "Finished sleep (%s) - slept %es (wanted %es)",
1193 remark, after - before, timeout)
1195 def pg_send(self, intf, pkts, worker=None):
1196 self.vapi.cli("clear trace")
1197 intf.add_stream(pkts, worker=worker)
1198 self.pg_enable_capture(self.pg_interfaces)
1201 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
1202 self.pg_send(intf, pkts)
1205 for i in self.pg_interfaces:
1206 i.get_capture(0, timeout=timeout)
1207 i.assert_nothing_captured(remark=remark)
1210 def send_and_expect(self, intf, pkts, output, n_rx=None, worker=None):
1213 self.pg_send(intf, pkts, worker=worker)
1214 rx = output.get_capture(n_rx)
1217 def send_and_expect_only(self, intf, pkts, output, timeout=None):
1218 self.pg_send(intf, pkts)
1219 rx = output.get_capture(len(pkts))
1223 for i in self.pg_interfaces:
1224 if i not in outputs:
1225 i.get_capture(0, timeout=timeout)
1226 i.assert_nothing_captured()
1232 def get_testcase_doc_name(test):
1233 return getdoc(test.__class__).splitlines()[0]
1236 def get_test_description(descriptions, test):
1237 short_description = test.shortDescription()
1238 if descriptions and short_description:
1239 return short_description
1244 class TestCaseInfo(object):
1245 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1246 self.logger = logger
1247 self.tempdir = tempdir
1248 self.vpp_pid = vpp_pid
1249 self.vpp_bin_path = vpp_bin_path
1250 self.core_crash_test = None
1253 class VppTestResult(unittest.TestResult):
1255 @property result_string
1256 String variable to store the test case result string.
1258 List variable containing 2-tuples of TestCase instances and strings
1259 holding formatted tracebacks. Each tuple represents a test which
1260 raised an unexpected exception.
1262 List variable containing 2-tuples of TestCase instances and strings
1263 holding formatted tracebacks. Each tuple represents a test where
1264 a failure was explicitly signalled using the TestCase.assert*()
1268 failed_test_cases_info = set()
1269 core_crash_test_cases_info = set()
1270 current_test_case_info = None
1272 def __init__(self, stream=None, descriptions=None, verbosity=None,
1275 :param stream File descriptor to store where to report test results.
1276 Set to the standard error stream by default.
1277 :param descriptions Boolean variable to store information if to use
1278 test case descriptions.
1279 :param verbosity Integer variable to store required verbosity level.
1281 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1282 self.stream = stream
1283 self.descriptions = descriptions
1284 self.verbosity = verbosity
1285 self.result_string = None
1286 self.runner = runner
1288 def addSuccess(self, test):
1290 Record a test succeeded result
1295 if self.current_test_case_info:
1296 self.current_test_case_info.logger.debug(
1297 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1298 test._testMethodName,
1299 test._testMethodDoc))
1300 unittest.TestResult.addSuccess(self, test)
1301 self.result_string = colorize("OK", GREEN)
1303 self.send_result_through_pipe(test, PASS)
1305 def addSkip(self, test, reason):
1307 Record a test skipped.
1313 if self.current_test_case_info:
1314 self.current_test_case_info.logger.debug(
1315 "--- addSkip() %s.%s(%s) called, reason is %s" %
1316 (test.__class__.__name__, test._testMethodName,
1317 test._testMethodDoc, reason))
1318 unittest.TestResult.addSkip(self, test, reason)
1319 self.result_string = colorize("SKIP", YELLOW)
1321 self.send_result_through_pipe(test, SKIP)
1323 def symlink_failed(self):
1324 if self.current_test_case_info:
1326 failed_dir = os.getenv('FAILED_DIR')
1327 link_path = os.path.join(
1330 os.path.basename(self.current_test_case_info.tempdir))
1332 self.current_test_case_info.logger.debug(
1333 "creating a link to the failed test")
1334 self.current_test_case_info.logger.debug(
1335 "os.symlink(%s, %s)" %
1336 (self.current_test_case_info.tempdir, link_path))
1337 if os.path.exists(link_path):
1338 self.current_test_case_info.logger.debug(
1339 'symlink already exists')
1341 os.symlink(self.current_test_case_info.tempdir, link_path)
1343 except Exception as e:
1344 self.current_test_case_info.logger.error(e)
1346 def send_result_through_pipe(self, test, result):
1347 if hasattr(self, 'test_framework_result_pipe'):
1348 pipe = self.test_framework_result_pipe
1350 pipe.send((test.id(), result))
1352 def log_error(self, test, err, fn_name):
1353 if self.current_test_case_info:
1354 if isinstance(test, unittest.suite._ErrorHolder):
1355 test_name = test.description
1357 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1358 test._testMethodName,
1359 test._testMethodDoc)
1360 self.current_test_case_info.logger.debug(
1361 "--- %s() %s called, err is %s" %
1362 (fn_name, test_name, err))
1363 self.current_test_case_info.logger.debug(
1364 "formatted exception is:\n%s" %
1365 "".join(format_exception(*err)))
1367 def add_error(self, test, err, unittest_fn, error_type):
1368 if error_type == FAIL:
1369 self.log_error(test, err, 'addFailure')
1370 error_type_str = colorize("FAIL", RED)
1371 elif error_type == ERROR:
1372 self.log_error(test, err, 'addError')
1373 error_type_str = colorize("ERROR", RED)
1375 raise Exception('Error type %s cannot be used to record an '
1376 'error or a failure' % error_type)
1378 unittest_fn(self, test, err)
1379 if self.current_test_case_info:
1380 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1382 self.current_test_case_info.tempdir)
1383 self.symlink_failed()
1384 self.failed_test_cases_info.add(self.current_test_case_info)
1385 if is_core_present(self.current_test_case_info.tempdir):
1386 if not self.current_test_case_info.core_crash_test:
1387 if isinstance(test, unittest.suite._ErrorHolder):
1388 test_name = str(test)
1390 test_name = "'{!s}' ({!s})".format(
1391 get_testcase_doc_name(test), test.id())
1392 self.current_test_case_info.core_crash_test = test_name
1393 self.core_crash_test_cases_info.add(
1394 self.current_test_case_info)
1396 self.result_string = '%s [no temp dir]' % error_type_str
1398 self.send_result_through_pipe(test, error_type)
1400 def addFailure(self, test, err):
1402 Record a test failed result
1405 :param err: error message
1408 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1410 def addError(self, test, err):
1412 Record a test error result
1415 :param err: error message
1418 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1420 def getDescription(self, test):
1422 Get test description
1425 :returns: test description
1428 return get_test_description(self.descriptions, test)
1430 def startTest(self, test):
1438 def print_header(test):
1439 test_doc = getdoc(test)
1441 raise Exception("No doc string for test '%s'" % test.id())
1442 test_title = test_doc.splitlines()[0]
1443 test_title_colored = colorize(test_title, GREEN)
1444 if test.is_tagged_run_solo():
1445 # long live PEP-8 and 80 char width limitation...
1447 test_title_colored = colorize("SOLO RUN: " + test_title, c)
1449 # This block may overwrite the colorized title above,
1450 # but we want this to stand out and be fixed
1451 if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
1453 w = "FIXME with VPP workers: "
1454 test_title_colored = colorize(w + test_title, c)
1456 if not hasattr(test.__class__, '_header_printed'):
1457 print(double_line_delim)
1458 print(test_title_colored)
1459 print(double_line_delim)
1460 test.__class__._header_printed = True
1463 self.start_test = time.time()
1464 unittest.TestResult.startTest(self, test)
1465 if self.verbosity > 0:
1466 self.stream.writeln(
1467 "Starting " + self.getDescription(test) + " ...")
1468 self.stream.writeln(single_line_delim)
1470 def stopTest(self, test):
1472 Called when the given test has been run
1477 unittest.TestResult.stopTest(self, test)
1479 if self.verbosity > 0:
1480 self.stream.writeln(single_line_delim)
1481 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1482 self.result_string))
1483 self.stream.writeln(single_line_delim)
1485 self.stream.writeln("%-68s %4.2f %s" %
1486 (self.getDescription(test),
1487 time.time() - self.start_test,
1488 self.result_string))
1490 self.send_result_through_pipe(test, TEST_RUN)
1492 def printErrors(self):
1494 Print errors from running the test case
1496 if len(self.errors) > 0 or len(self.failures) > 0:
1497 self.stream.writeln()
1498 self.printErrorList('ERROR', self.errors)
1499 self.printErrorList('FAIL', self.failures)
1501 # ^^ that is the last output from unittest before summary
1502 if not self.runner.print_summary:
1503 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1504 self.stream = devnull
1505 self.runner.stream = devnull
1507 def printErrorList(self, flavour, errors):
1509 Print error list to the output stream together with error type
1510 and test case description.
1512 :param flavour: error type
1513 :param errors: iterable errors
1516 for test, err in errors:
1517 self.stream.writeln(double_line_delim)
1518 self.stream.writeln("%s: %s" %
1519 (flavour, self.getDescription(test)))
1520 self.stream.writeln(single_line_delim)
1521 self.stream.writeln("%s" % err)
1524 class VppTestRunner(unittest.TextTestRunner):
1526 A basic test runner implementation which prints results to standard error.
1530 def resultclass(self):
1531 """Class maintaining the results of the tests"""
1532 return VppTestResult
1534 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1535 result_pipe=None, failfast=False, buffer=False,
1536 resultclass=None, print_summary=True, **kwargs):
1537 # ignore stream setting here, use hard-coded stdout to be in sync
1538 # with prints from VppTestCase methods ...
1539 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1540 verbosity, failfast, buffer,
1541 resultclass, **kwargs)
1542 KeepAliveReporter.pipe = keep_alive_pipe
1544 self.orig_stream = self.stream
1545 self.resultclass.test_framework_result_pipe = result_pipe
1547 self.print_summary = print_summary
1549 def _makeResult(self):
1550 return self.resultclass(self.stream,
1555 def run(self, test):
1562 faulthandler.enable() # emit stack trace to stderr if killed by signal
1564 result = super(VppTestRunner, self).run(test)
1565 if not self.print_summary:
1566 self.stream = self.orig_stream
1567 result.stream = self.orig_stream
1571 class Worker(Thread):
1572 def __init__(self, executable_args, logger, env=None, *args, **kwargs):
1573 super(Worker, self).__init__(*args, **kwargs)
1574 self.logger = logger
1575 self.args = executable_args
1576 if hasattr(self, 'testcase') and self.testcase.debug_all:
1577 if self.testcase.debug_gdbserver:
1578 self.args = ['/usr/bin/gdbserver', 'localhost:{port}'
1579 .format(port=self.testcase.gdbserver_port)] + args
1580 elif self.testcase.debug_gdb and hasattr(self, 'wait_for_gdb'):
1581 self.args.append(self.wait_for_gdb)
1582 self.app_bin = executable_args[0]
1583 self.app_name = os.path.basename(self.app_bin)
1584 if hasattr(self, 'role'):
1585 self.app_name += ' {role}'.format(role=self.role)
1588 env = {} if env is None else env
1589 self.env = copy.deepcopy(env)
1591 def wait_for_enter(self):
1592 if not hasattr(self, 'testcase'):
1594 if self.testcase.debug_all and self.testcase.debug_gdbserver:
1596 print(double_line_delim)
1597 print("Spawned GDB Server for '{app}' with PID: {pid}"
1598 .format(app=self.app_name, pid=self.process.pid))
1599 elif self.testcase.debug_all and self.testcase.debug_gdb:
1601 print(double_line_delim)
1602 print("Spawned '{app}' with PID: {pid}"
1603 .format(app=self.app_name, pid=self.process.pid))
1606 print(single_line_delim)
1607 print("You can debug '{app}' using:".format(app=self.app_name))
1608 if self.testcase.debug_gdbserver:
1609 print("sudo gdb " + self.app_bin +
1610 " -ex 'target remote localhost:{port}'"
1611 .format(port=self.testcase.gdbserver_port))
1612 print("Now is the time to attach gdb by running the above "
1613 "command, set up breakpoints etc., then resume from "
1614 "within gdb by issuing the 'continue' command")
1615 self.testcase.gdbserver_port += 1
1616 elif self.testcase.debug_gdb:
1617 print("sudo gdb " + self.app_bin +
1618 " -ex 'attach {pid}'".format(pid=self.process.pid))
1619 print("Now is the time to attach gdb by running the above "
1620 "command and set up breakpoints etc., then resume from"
1621 " within gdb by issuing the 'continue' command")
1622 print(single_line_delim)
1623 input("Press ENTER to continue running the testcase...")
1626 executable = self.args[0]
1627 if not os.path.exists(executable) or not os.access(
1628 executable, os.F_OK | os.X_OK):
1629 # Exit code that means some system file did not exist,
1630 # could not be opened, or had some other kind of error.
1631 self.result = os.EX_OSFILE
1632 raise EnvironmentError(
1633 "executable '%s' is not found or executable." % executable)
1634 self.logger.debug("Running executable: '{app}'"
1635 .format(app=' '.join(self.args)))
1636 env = os.environ.copy()
1637 env.update(self.env)
1638 env["CK_LOG_FILE_NAME"] = "-"
1639 self.process = subprocess.Popen(
1640 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1641 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1642 self.wait_for_enter()
1643 out, err = self.process.communicate()
1644 self.logger.debug("Finished running `{app}'".format(app=self.app_name))
1645 self.logger.info("Return code is `%s'" % self.process.returncode)
1646 self.logger.info(single_line_delim)
1647 self.logger.info("Executable `{app}' wrote to stdout:"
1648 .format(app=self.app_name))
1649 self.logger.info(single_line_delim)
1650 self.logger.info(out.decode('utf-8'))
1651 self.logger.info(single_line_delim)
1652 self.logger.info("Executable `{app}' wrote to stderr:"
1653 .format(app=self.app_name))
1654 self.logger.info(single_line_delim)
1655 self.logger.info(err.decode('utf-8'))
1656 self.logger.info(single_line_delim)
1657 self.result = self.process.returncode
1660 if __name__ == '__main__':