3 from __future__ import print_function
18 from collections import deque
19 from threading import Thread, Event
20 from inspect import getdoc, isclass
21 from traceback import format_exception
22 from logging import FileHandler, DEBUG, Formatter
25 from scapy.packet import Raw
26 import hook as hookmodule
27 from vpp_pg_interface import VppPGInterface
28 from vpp_sub_interface import VppSubInterface
29 from vpp_lo_interface import VppLoInterface
30 from vpp_bvi_interface import VppBviInterface
31 from vpp_papi_provider import VppPapiProvider
33 from vpp_papi.vpp_stats import VPPStats
34 from vpp_papi.vpp_transport_shmem import VppTransportShmemIOError
35 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
37 from vpp_object import VppObjectRegistry
38 from util import ppp, is_core_present
39 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
40 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
41 from scapy.layers.inet6 import ICMPv6EchoReply
43 if os.name == 'posix' and sys.version_info[0] < 3:
44 # using subprocess32 is recommended by python official documentation
45 # @ https://docs.python.org/2/library/subprocess.html
46 import subprocess32 as subprocess
50 # Python2/3 compatible
56 logger = logging.getLogger(__name__)
58 # Set up an empty logger for the testcase that can be overridden as necessary
59 null_logger = logging.getLogger('VppTestCase')
60 null_logger.addHandler(logging.NullHandler())
69 class BoolEnvironmentVariable(object):
71 def __init__(self, env_var_name, default='n', true_values=None):
72 self.name = env_var_name
73 self.default = default
74 self.true_values = true_values if true_values is not None else \
78 return os.getenv(self.name, self.default).lower() in self.true_values
80 if sys.version_info[0] == 2:
81 __nonzero__ = __bool__
84 return 'BoolEnvironmentVariable(%r, default=%r, true_values=%r)' % \
85 (self.name, self.default, self.true_values)
88 debug_framework = BoolEnvironmentVariable('TEST_DEBUG')
93 Test framework module.
95 The module provides a set of tools for constructing and running tests and
96 representing the results.
100 class VppDiedError(Exception):
101 """ exception for reporting that the subprocess has died."""
103 signals_by_value = {v: k for k, v in signal.__dict__.items() if
104 k.startswith('SIG') and not k.startswith('SIG_')}
106 def __init__(self, rv=None, testcase=None, method_name=None):
108 self.signal_name = None
109 self.testcase = testcase
110 self.method_name = method_name
113 self.signal_name = VppDiedError.signals_by_value[-rv]
114 except (KeyError, TypeError):
117 if testcase is None and method_name is None:
120 in_msg = 'running %s.%s ' % (testcase, method_name)
122 msg = "VPP subprocess died %sunexpectedly with return code: %d%s." % (
125 ' [%s]' % (self.signal_name if
126 self.signal_name is not None else ''))
127 super(VppDiedError, self).__init__(msg)
130 class _PacketInfo(object):
131 """Private class to create packet info object.
133 Help process information about the next packet.
134 Set variables to default values.
136 #: Store the index of the packet.
138 #: Store the index of the source packet generator interface of the packet.
140 #: Store the index of the destination packet generator interface
143 #: Store expected ip version
145 #: Store expected upper protocol
147 #: Store the copy of the former packet.
150 def __eq__(self, other):
151 index = self.index == other.index
152 src = self.src == other.src
153 dst = self.dst == other.dst
154 data = self.data == other.data
155 return index and src and dst and data
158 def pump_output(testclass):
159 """ pump output from vpp stdout/stderr to proper queues """
162 while not testclass.pump_thread_stop_flag.is_set():
163 readable = select.select([testclass.vpp.stdout.fileno(),
164 testclass.vpp.stderr.fileno(),
165 testclass.pump_thread_wakeup_pipe[0]],
167 if testclass.vpp.stdout.fileno() in readable:
168 read = os.read(testclass.vpp.stdout.fileno(), 102400)
170 split = read.decode('ascii',
171 errors='backslashreplace').splitlines(True)
172 if len(stdout_fragment) > 0:
173 split[0] = "%s%s" % (stdout_fragment, split[0])
174 if len(split) > 0 and split[-1].endswith("\n"):
178 stdout_fragment = split[-1]
179 testclass.vpp_stdout_deque.extend(split[:limit])
180 if not testclass.cache_vpp_output:
181 for line in split[:limit]:
182 testclass.logger.info(
183 "VPP STDOUT: %s" % line.rstrip("\n"))
184 if testclass.vpp.stderr.fileno() in readable:
185 read = os.read(testclass.vpp.stderr.fileno(), 102400)
187 split = read.decode('ascii',
188 errors='backslashreplace').splitlines(True)
189 if len(stderr_fragment) > 0:
190 split[0] = "%s%s" % (stderr_fragment, split[0])
191 if len(split) > 0 and split[-1].endswith("\n"):
195 stderr_fragment = split[-1]
197 testclass.vpp_stderr_deque.extend(split[:limit])
198 if not testclass.cache_vpp_output:
199 for line in split[:limit]:
200 testclass.logger.error(
201 "VPP STDERR: %s" % line.rstrip("\n"))
202 # ignoring the dummy pipe here intentionally - the
203 # flag will take care of properly terminating the loop
206 def _is_skip_aarch64_set():
207 return BoolEnvironmentVariable('SKIP_AARCH64')
210 is_skip_aarch64_set = _is_skip_aarch64_set()
213 def _is_platform_aarch64():
214 return platform.machine() == 'aarch64'
217 is_platform_aarch64 = _is_platform_aarch64()
220 def _running_extended_tests():
221 return BoolEnvironmentVariable("EXTENDED_TESTS")
224 running_extended_tests = _running_extended_tests()
227 def _running_gcov_tests():
228 return BoolEnvironmentVariable("GCOV_TESTS")
231 running_gcov_tests = _running_gcov_tests()
234 def _running_on_centos():
235 os_id = os.getenv("OS_ID", "")
236 return True if "centos" in os_id.lower() else False
239 running_on_centos = _running_on_centos()
242 class KeepAliveReporter(object):
244 Singleton object which reports test start to parent process
249 self.__dict__ = self._shared_state
257 def pipe(self, pipe):
258 if self._pipe is not None:
259 raise Exception("Internal error - pipe should only be set once.")
262 def send_keep_alive(self, test, desc=None):
264 Write current test tmpdir & desc to keep-alive pipe to signal liveness
266 if self.pipe is None:
267 # if not running forked..
271 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
275 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
278 class VppTestCase(unittest.TestCase):
279 """This subclass is a base class for VPP test cases that are implemented as
280 classes. It provides methods to create and run test case.
283 extra_vpp_punt_config = []
284 extra_vpp_plugin_config = []
286 vapi_response_timeout = 5
289 def packet_infos(self):
290 """List of packet infos"""
291 return self._packet_infos
294 def get_packet_count_for_if_idx(cls, dst_if_index):
295 """Get the number of packet info for specified destination if index"""
296 if dst_if_index in cls._packet_count_for_dst_if_idx:
297 return cls._packet_count_for_dst_if_idx[dst_if_index]
303 """ if the test case class is timing-sensitive - return true """
308 """Return the instance of this testcase"""
309 return cls.test_instance
312 def set_debug_flags(cls, d):
313 cls.gdbserver_port = 7777
314 cls.debug_core = False
315 cls.debug_gdb = False
316 cls.debug_gdbserver = False
317 cls.debug_all = False
322 cls.debug_core = True
323 elif dl == "gdb" or dl == "gdb-all":
325 elif dl == "gdbserver" or dl == "gdbserver-all":
326 cls.debug_gdbserver = True
328 raise Exception("Unrecognized DEBUG option: '%s'" % d)
329 if dl == "gdb-all" or dl == "gdbserver-all":
333 def get_least_used_cpu():
334 cpu_usage_list = [set(range(psutil.cpu_count()))]
335 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
336 if 'vpp_main' == p.info['name']]
337 for vpp_process in vpp_processes:
338 for cpu_usage_set in cpu_usage_list:
340 cpu_num = vpp_process.cpu_num()
341 if cpu_num in cpu_usage_set:
342 cpu_usage_set_index = cpu_usage_list.index(
344 if cpu_usage_set_index == len(cpu_usage_list) - 1:
345 cpu_usage_list.append({cpu_num})
347 cpu_usage_list[cpu_usage_set_index + 1].add(
349 cpu_usage_set.remove(cpu_num)
351 except psutil.NoSuchProcess:
354 for cpu_usage_set in cpu_usage_list:
355 if len(cpu_usage_set) > 0:
356 min_usage_set = cpu_usage_set
359 return random.choice(tuple(min_usage_set))
362 def setUpConstants(cls):
363 """ Set-up the test case class based on environment variables """
364 cls.step = BoolEnvironmentVariable('STEP')
365 d = os.getenv("DEBUG", None)
366 # inverted case to handle '' == True
367 c = os.getenv("CACHE_OUTPUT", "1")
368 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
369 cls.set_debug_flags(d)
370 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
371 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
372 cls.test_plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
373 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
375 if cls.plugin_path is not None:
376 if cls.extern_plugin_path is not None:
377 plugin_path = "%s:%s" % (
378 cls.plugin_path, cls.extern_plugin_path)
380 plugin_path = cls.plugin_path
381 elif cls.extern_plugin_path is not None:
382 plugin_path = cls.extern_plugin_path
384 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
385 debug_cli = "cli-listen localhost:5002"
387 size = os.getenv("COREDUMP_SIZE")
389 coredump_size = "coredump-size %s" % size
390 if coredump_size is None:
391 coredump_size = "coredump-size unlimited"
393 cpu_core_number = cls.get_least_used_cpu()
394 if not hasattr(cls, "worker_config"):
395 cls.worker_config = ""
397 default_variant = os.getenv("VARIANT")
398 if default_variant is not None:
399 default_variant = "defaults { %s 100 }" % default_variant
403 api_fuzzing = os.getenv("API_FUZZ")
404 if api_fuzzing is None:
407 cls.vpp_cmdline = [cls.vpp_bin, "unix",
408 "{", "nodaemon", debug_cli, "full-coredump",
409 coredump_size, "runtime-dir", cls.tempdir, "}",
410 "api-trace", "{", "on", "}", "api-segment", "{",
411 "prefix", cls.shm_prefix, "}", "cpu", "{",
412 "main-core", str(cpu_core_number),
413 cls.worker_config, "}",
414 "physmem", "{", "max-size", "32m", "}",
415 "statseg", "{", "socket-name", cls.stats_sock, "}",
416 "socksvr", "{", "socket-name", cls.api_sock, "}",
417 "node { ", default_variant, "}",
418 "api-fuzz {", api_fuzzing, "}",
420 "{", "plugin", "dpdk_plugin.so", "{", "disable",
421 "}", "plugin", "rdma_plugin.so", "{", "disable",
422 "}", "plugin", "lisp_unittest_plugin.so", "{",
424 "}", "plugin", "unittest_plugin.so", "{", "enable",
425 "}"] + cls.extra_vpp_plugin_config + ["}", ]
427 if cls.extra_vpp_punt_config is not None:
428 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
429 if plugin_path is not None:
430 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
431 if cls.test_plugin_path is not None:
432 cls.vpp_cmdline.extend(["test_plugin_path", cls.test_plugin_path])
434 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
435 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
438 def wait_for_enter(cls):
439 if cls.debug_gdbserver:
440 print(double_line_delim)
441 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
443 print(double_line_delim)
444 print("Spawned VPP with PID: %d" % cls.vpp.pid)
446 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
448 print(single_line_delim)
449 print("You can debug VPP using:")
450 if cls.debug_gdbserver:
451 print("sudo gdb " + cls.vpp_bin +
452 " -ex 'target remote localhost:{port}'"
453 .format(port=cls.gdbserver_port))
454 print("Now is the time to attach gdb by running the above "
455 "command, set up breakpoints etc., then resume VPP from "
456 "within gdb by issuing the 'continue' command")
457 cls.gdbserver_port += 1
459 print("sudo gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
460 print("Now is the time to attach gdb by running the above "
461 "command and set up breakpoints etc., then resume VPP from"
462 " within gdb by issuing the 'continue' command")
463 print(single_line_delim)
464 input("Press ENTER to continue running the testcase...")
468 cmdline = cls.vpp_cmdline
470 if cls.debug_gdbserver:
471 gdbserver = '/usr/bin/gdbserver'
472 if not os.path.isfile(gdbserver) or \
473 not os.access(gdbserver, os.X_OK):
474 raise Exception("gdbserver binary '%s' does not exist or is "
475 "not executable" % gdbserver)
477 cmdline = [gdbserver, 'localhost:{port}'
478 .format(port=cls.gdbserver_port)] + cls.vpp_cmdline
479 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
482 cls.vpp = subprocess.Popen(cmdline,
483 stdout=subprocess.PIPE,
484 stderr=subprocess.PIPE)
485 except subprocess.CalledProcessError as e:
486 cls.logger.critical("Subprocess returned with non-0 return code: ("
490 cls.logger.critical("Subprocess returned with OS error: "
491 "(%s) %s", e.errno, e.strerror)
493 except Exception as e:
494 cls.logger.exception("Subprocess returned unexpected from "
501 def wait_for_coredump(cls):
502 corefile = cls.tempdir + "/core"
503 if os.path.isfile(corefile):
504 cls.logger.error("Waiting for coredump to complete: %s", corefile)
505 curr_size = os.path.getsize(corefile)
506 deadline = time.time() + 60
508 while time.time() < deadline:
511 curr_size = os.path.getsize(corefile)
512 if size == curr_size:
516 cls.logger.error("Timed out waiting for coredump to complete:"
519 cls.logger.error("Coredump complete: %s, size %d",
525 Perform class setup before running the testcase
526 Remove shared memory files, start vpp and connect the vpp-api
528 super(VppTestCase, cls).setUpClass()
529 gc.collect() # run garbage collection first
530 cls.logger = get_logger(cls.__name__)
531 seed = os.environ["RND_SEED"]
533 if hasattr(cls, 'parallel_handler'):
534 cls.logger.addHandler(cls.parallel_handler)
535 cls.logger.propagate = False
537 cls.tempdir = tempfile.mkdtemp(
538 prefix='vpp-unittest-%s-' % cls.__name__)
539 cls.stats_sock = "%s/stats.sock" % cls.tempdir
540 cls.api_sock = "%s/api.sock" % cls.tempdir
541 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
542 cls.file_handler.setFormatter(
543 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
545 cls.file_handler.setLevel(DEBUG)
546 cls.logger.addHandler(cls.file_handler)
547 cls.logger.debug("--- setUpClass() for %s called ---" %
549 cls.shm_prefix = os.path.basename(cls.tempdir)
550 os.chdir(cls.tempdir)
551 cls.logger.info("Temporary dir is %s, shm prefix is %s",
552 cls.tempdir, cls.shm_prefix)
553 cls.logger.debug("Random seed is %s" % seed)
555 cls.reset_packet_infos()
559 cls.registry = VppObjectRegistry()
560 cls.vpp_startup_failed = False
561 cls.reporter = KeepAliveReporter()
562 # need to catch exceptions here because if we raise, then the cleanup
563 # doesn't get called and we might end with a zombie vpp
566 cls.reporter.send_keep_alive(cls, 'setUpClass')
567 VppTestResult.current_test_case_info = TestCaseInfo(
568 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
569 cls.vpp_stdout_deque = deque()
570 cls.vpp_stderr_deque = deque()
571 cls.pump_thread_stop_flag = Event()
572 cls.pump_thread_wakeup_pipe = os.pipe()
573 cls.pump_thread = Thread(target=pump_output, args=(cls,))
574 cls.pump_thread.daemon = True
575 cls.pump_thread.start()
576 if cls.debug_gdb or cls.debug_gdbserver:
577 cls.vapi_response_timeout = 0
578 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
579 cls.vapi_response_timeout)
581 hook = hookmodule.StepHook(cls)
583 hook = hookmodule.PollHook(cls)
584 cls.vapi.register_hook(hook)
585 cls.statistics = VPPStats(socketname=cls.stats_sock)
589 cls.vpp_startup_failed = True
591 "VPP died shortly after startup, check the"
592 " output to standard error for possible cause")
596 except vpp_papi.VPPIOError as e:
597 cls.logger.debug("Exception connecting to vapi: %s" % e)
598 cls.vapi.disconnect()
600 if cls.debug_gdbserver:
601 print(colorize("You're running VPP inside gdbserver but "
602 "VPP-API connection failed, did you forget "
603 "to 'continue' VPP from within gdb?", RED))
605 except vpp_papi.VPPRuntimeError as e:
606 cls.logger.debug("%s" % e)
609 except Exception as e:
610 cls.logger.debug("Exception connecting to VPP: %s" % e)
615 def _debug_quit(cls):
616 if (cls.debug_gdbserver or cls.debug_gdb):
620 if cls.vpp.returncode is None:
622 print(double_line_delim)
623 print("VPP or GDB server is still running")
624 print(single_line_delim)
625 input("When done debugging, press ENTER to kill the "
626 "process and finish running the testcase...")
627 except AttributeError:
633 Disconnect vpp-api, kill vpp and cleanup shared memory files
637 # first signal that we want to stop the pump thread, then wake it up
638 if hasattr(cls, 'pump_thread_stop_flag'):
639 cls.pump_thread_stop_flag.set()
640 if hasattr(cls, 'pump_thread_wakeup_pipe'):
641 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
642 if hasattr(cls, 'pump_thread'):
643 cls.logger.debug("Waiting for pump thread to stop")
644 cls.pump_thread.join()
645 if hasattr(cls, 'vpp_stderr_reader_thread'):
646 cls.logger.debug("Waiting for stderr pump to stop")
647 cls.vpp_stderr_reader_thread.join()
649 if hasattr(cls, 'vpp'):
650 if hasattr(cls, 'vapi'):
651 cls.logger.debug(cls.vapi.vpp.get_stats())
652 cls.logger.debug("Disconnecting class vapi client on %s",
654 cls.vapi.disconnect()
655 cls.logger.debug("Deleting class vapi attribute on %s",
659 if cls.vpp.returncode is None:
660 cls.wait_for_coredump()
661 cls.logger.debug("Sending TERM to vpp")
663 cls.logger.debug("Waiting for vpp to die")
664 cls.vpp.communicate()
665 cls.logger.debug("Deleting class vpp attribute on %s",
669 if cls.vpp_startup_failed:
670 stdout_log = cls.logger.info
671 stderr_log = cls.logger.critical
673 stdout_log = cls.logger.info
674 stderr_log = cls.logger.info
676 if hasattr(cls, 'vpp_stdout_deque'):
677 stdout_log(single_line_delim)
678 stdout_log('VPP output to stdout while running %s:', cls.__name__)
679 stdout_log(single_line_delim)
680 vpp_output = "".join(cls.vpp_stdout_deque)
681 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
683 stdout_log('\n%s', vpp_output)
684 stdout_log(single_line_delim)
686 if hasattr(cls, 'vpp_stderr_deque'):
687 stderr_log(single_line_delim)
688 stderr_log('VPP output to stderr while running %s:', cls.__name__)
689 stderr_log(single_line_delim)
690 vpp_output = "".join(cls.vpp_stderr_deque)
691 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
693 stderr_log('\n%s', vpp_output)
694 stderr_log(single_line_delim)
697 def tearDownClass(cls):
698 """ Perform final cleanup after running all tests in this test-case """
699 cls.logger.debug("--- tearDownClass() for %s called ---" %
701 cls.reporter.send_keep_alive(cls, 'tearDownClass')
703 cls.file_handler.close()
704 cls.reset_packet_infos()
706 debug_internal.on_tear_down_class(cls)
708 def show_commands_at_teardown(self):
709 """ Allow subclass specific teardown logging additions."""
710 self.logger.info("--- No test specific show commands provided. ---")
713 """ Show various debug prints after each test """
714 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
715 (self.__class__.__name__, self._testMethodName,
716 self._testMethodDoc))
719 if not self.vpp_dead:
720 self.logger.debug(self.vapi.cli("show trace max 1000"))
721 self.logger.info(self.vapi.ppcli("show interface"))
722 self.logger.info(self.vapi.ppcli("show hardware"))
723 self.logger.info(self.statistics.set_errors_str())
724 self.logger.info(self.vapi.ppcli("show run"))
725 self.logger.info(self.vapi.ppcli("show log"))
726 self.logger.info(self.vapi.ppcli("show bihash"))
727 self.logger.info("Logging testcase specific show commands.")
728 self.show_commands_at_teardown()
729 self.registry.remove_vpp_config(self.logger)
730 # Save/Dump VPP api trace log
731 m = self._testMethodName
732 api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
733 tmp_api_trace = "/tmp/%s" % api_trace
734 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
735 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
736 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
738 os.rename(tmp_api_trace, vpp_api_trace_log)
739 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
741 except VppTransportShmemIOError:
742 self.logger.debug("VppTransportShmemIOError: Vpp dead. "
743 "Cannot log show commands.")
746 self.registry.unregister_all(self.logger)
749 """ Clear trace before running each test"""
750 super(VppTestCase, self).setUp()
751 self.reporter.send_keep_alive(self)
754 raise VppDiedError(rv=None, testcase=self.__class__.__name__,
755 method_name=self._testMethodName)
756 self.sleep(.1, "during setUp")
757 self.vpp_stdout_deque.append(
758 "--- test setUp() for %s.%s(%s) starts here ---\n" %
759 (self.__class__.__name__, self._testMethodName,
760 self._testMethodDoc))
761 self.vpp_stderr_deque.append(
762 "--- test setUp() for %s.%s(%s) starts here ---\n" %
763 (self.__class__.__name__, self._testMethodName,
764 self._testMethodDoc))
765 self.vapi.cli("clear trace")
766 # store the test instance inside the test class - so that objects
767 # holding the class can access instance methods (like assertEqual)
768 type(self).test_instance = self
771 def pg_enable_capture(cls, interfaces=None):
773 Enable capture on packet-generator interfaces
775 :param interfaces: iterable interface indexes (if None,
776 use self.pg_interfaces)
779 if interfaces is None:
780 interfaces = cls.pg_interfaces
785 def register_capture(cls, cap_name):
786 """ Register a capture in the testclass """
787 # add to the list of captures with current timestamp
788 cls._captures.append((time.time(), cap_name))
791 def get_vpp_time(cls):
792 # processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT"
793 # returns float("2.190522")
794 timestr = cls.vapi.cli('show clock')
795 head, sep, tail = timestr.partition(',')
796 head, sep, tail = head.partition('Time now')
800 def sleep_on_vpp_time(cls, sec):
801 """ Sleep according to time in VPP world """
802 # On a busy system with many processes
803 # we might end up with VPP time being slower than real world
804 # So take that into account when waiting for VPP to do something
805 start_time = cls.get_vpp_time()
806 while cls.get_vpp_time() - start_time < sec:
811 """ Enable the PG, wait till it is done, then clean up """
812 cls.vapi.cli("trace add pg-input 1000")
813 cls.vapi.cli('packet-generator enable')
814 # PG, when starts, runs to completion -
815 # so let's avoid a race condition,
816 # and wait a little till it's done.
817 # Then clean it up - and then be gone.
818 deadline = time.time() + 300
819 while cls.vapi.cli('show packet-generator').find("Yes") != -1:
820 cls.sleep(0.01) # yield
821 if time.time() > deadline:
822 cls.logger.error("Timeout waiting for pg to stop")
824 for stamp, cap_name in cls._captures:
825 cls.vapi.cli('packet-generator delete %s' % cap_name)
829 def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
831 Create packet-generator interfaces.
833 :param interfaces: iterable indexes of the interfaces.
834 :returns: List of created interfaces.
839 intf = VppPGInterface(cls, i, gso, gso_size)
840 setattr(cls, intf.name, intf)
842 cls.pg_interfaces = result
846 def create_loopback_interfaces(cls, count):
848 Create loopback interfaces.
850 :param count: number of interfaces created.
851 :returns: List of created interfaces.
853 result = [VppLoInterface(cls) for i in range(count)]
855 setattr(cls, intf.name, intf)
856 cls.lo_interfaces = result
860 def create_bvi_interfaces(cls, count):
862 Create BVI interfaces.
864 :param count: number of interfaces created.
865 :returns: List of created interfaces.
867 result = [VppBviInterface(cls) for i in range(count)]
869 setattr(cls, intf.name, intf)
870 cls.bvi_interfaces = result
874 def extend_packet(packet, size, padding=' '):
876 Extend packet to given size by padding with spaces or custom padding
877 NOTE: Currently works only when Raw layer is present.
879 :param packet: packet
880 :param size: target size
881 :param padding: padding used to extend the payload
884 packet_len = len(packet) + 4
885 extend = size - packet_len
887 num = (extend // len(padding)) + 1
888 packet[Raw].load += (padding * num)[:extend].encode("ascii")
891 def reset_packet_infos(cls):
892 """ Reset the list of packet info objects and packet counts to zero """
893 cls._packet_infos = {}
894 cls._packet_count_for_dst_if_idx = {}
897 def create_packet_info(cls, src_if, dst_if):
899 Create packet info object containing the source and destination indexes
900 and add it to the testcase's packet info list
902 :param VppInterface src_if: source interface
903 :param VppInterface dst_if: destination interface
905 :returns: _PacketInfo object
909 info.index = len(cls._packet_infos)
910 info.src = src_if.sw_if_index
911 info.dst = dst_if.sw_if_index
912 if isinstance(dst_if, VppSubInterface):
913 dst_idx = dst_if.parent.sw_if_index
915 dst_idx = dst_if.sw_if_index
916 if dst_idx in cls._packet_count_for_dst_if_idx:
917 cls._packet_count_for_dst_if_idx[dst_idx] += 1
919 cls._packet_count_for_dst_if_idx[dst_idx] = 1
920 cls._packet_infos[info.index] = info
924 def info_to_payload(info):
926 Convert _PacketInfo object to packet payload
928 :param info: _PacketInfo object
930 :returns: string containing serialized data from packet info
932 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
936 def payload_to_info(payload, payload_field='load'):
938 Convert packet payload to _PacketInfo object
940 :param payload: packet payload
941 :type payload: <class 'scapy.packet.Raw'>
942 :param payload_field: packet fieldname of payload "load" for
943 <class 'scapy.packet.Raw'>
944 :type payload_field: str
945 :returns: _PacketInfo object containing de-serialized data from payload
948 numbers = getattr(payload, payload_field).split()
950 info.index = int(numbers[0])
951 info.src = int(numbers[1])
952 info.dst = int(numbers[2])
953 info.ip = int(numbers[3])
954 info.proto = int(numbers[4])
957 def get_next_packet_info(self, info):
959 Iterate over the packet info list stored in the testcase
960 Start iteration with first element if info is None
961 Continue based on index in info if info is specified
963 :param info: info or None
964 :returns: next info in list or None if no more infos
969 next_index = info.index + 1
970 if next_index == len(self._packet_infos):
973 return self._packet_infos[next_index]
975 def get_next_packet_info_for_interface(self, src_index, info):
977 Search the packet info list for the next packet info with same source
980 :param src_index: source interface index to search for
981 :param info: packet info - where to start the search
982 :returns: packet info or None
986 info = self.get_next_packet_info(info)
989 if info.src == src_index:
992 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
994 Search the packet info list for the next packet info with same source
995 and destination interface indexes
997 :param src_index: source interface index to search for
998 :param dst_index: destination interface index to search for
999 :param info: packet info - where to start the search
1000 :returns: packet info or None
1004 info = self.get_next_packet_info_for_interface(src_index, info)
1007 if info.dst == dst_index:
1010 def assert_equal(self, real_value, expected_value, name_or_class=None):
1011 if name_or_class is None:
1012 self.assertEqual(real_value, expected_value)
1015 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
1016 msg = msg % (getdoc(name_or_class).strip(),
1017 real_value, str(name_or_class(real_value)),
1018 expected_value, str(name_or_class(expected_value)))
1020 msg = "Invalid %s: %s does not match expected value %s" % (
1021 name_or_class, real_value, expected_value)
1023 self.assertEqual(real_value, expected_value, msg)
1025 def assert_in_range(self,
1033 msg = "Invalid %s: %s out of range <%s,%s>" % (
1034 name, real_value, expected_min, expected_max)
1035 self.assertTrue(expected_min <= real_value <= expected_max, msg)
1037 def assert_packet_checksums_valid(self, packet,
1038 ignore_zero_udp_checksums=True):
1039 received = packet.__class__(scapy.compat.raw(packet))
1040 udp_layers = ['UDP', 'UDPerror']
1041 checksum_fields = ['cksum', 'chksum']
1044 temp = received.__class__(scapy.compat.raw(received))
1046 layer = temp.getlayer(counter)
1048 layer = layer.copy()
1049 layer.remove_payload()
1050 for cf in checksum_fields:
1051 if hasattr(layer, cf):
1052 if ignore_zero_udp_checksums and \
1053 0 == getattr(layer, cf) and \
1054 layer.name in udp_layers:
1056 delattr(temp.getlayer(counter), cf)
1057 checksums.append((counter, cf))
1060 counter = counter + 1
1061 if 0 == len(checksums):
1063 temp = temp.__class__(scapy.compat.raw(temp))
1064 for layer, cf in checksums:
1065 calc_sum = getattr(temp[layer], cf)
1067 getattr(received[layer], cf), calc_sum,
1068 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
1070 "Checksum field `%s` on `%s` layer has correct value `%s`" %
1071 (cf, temp[layer].name, calc_sum))
1073 def assert_checksum_valid(self, received_packet, layer,
1074 field_name='chksum',
1075 ignore_zero_checksum=False):
1076 """ Check checksum of received packet on given layer """
1077 received_packet_checksum = getattr(received_packet[layer], field_name)
1078 if ignore_zero_checksum and 0 == received_packet_checksum:
1080 recalculated = received_packet.__class__(
1081 scapy.compat.raw(received_packet))
1082 delattr(recalculated[layer], field_name)
1083 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1084 self.assert_equal(received_packet_checksum,
1085 getattr(recalculated[layer], field_name),
1086 "packet checksum on layer: %s" % layer)
1088 def assert_ip_checksum_valid(self, received_packet,
1089 ignore_zero_checksum=False):
1090 self.assert_checksum_valid(received_packet, 'IP',
1091 ignore_zero_checksum=ignore_zero_checksum)
1093 def assert_tcp_checksum_valid(self, received_packet,
1094 ignore_zero_checksum=False):
1095 self.assert_checksum_valid(received_packet, 'TCP',
1096 ignore_zero_checksum=ignore_zero_checksum)
1098 def assert_udp_checksum_valid(self, received_packet,
1099 ignore_zero_checksum=True):
1100 self.assert_checksum_valid(received_packet, 'UDP',
1101 ignore_zero_checksum=ignore_zero_checksum)
1103 def assert_embedded_icmp_checksum_valid(self, received_packet):
1104 if received_packet.haslayer(IPerror):
1105 self.assert_checksum_valid(received_packet, 'IPerror')
1106 if received_packet.haslayer(TCPerror):
1107 self.assert_checksum_valid(received_packet, 'TCPerror')
1108 if received_packet.haslayer(UDPerror):
1109 self.assert_checksum_valid(received_packet, 'UDPerror',
1110 ignore_zero_checksum=True)
1111 if received_packet.haslayer(ICMPerror):
1112 self.assert_checksum_valid(received_packet, 'ICMPerror')
1114 def assert_icmp_checksum_valid(self, received_packet):
1115 self.assert_checksum_valid(received_packet, 'ICMP')
1116 self.assert_embedded_icmp_checksum_valid(received_packet)
1118 def assert_icmpv6_checksum_valid(self, pkt):
1119 if pkt.haslayer(ICMPv6DestUnreach):
1120 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
1121 self.assert_embedded_icmp_checksum_valid(pkt)
1122 if pkt.haslayer(ICMPv6EchoRequest):
1123 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
1124 if pkt.haslayer(ICMPv6EchoReply):
1125 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
1127 def get_packet_counter(self, counter):
1128 if counter.startswith("/"):
1129 counter_value = self.statistics.get_counter(counter)
1131 counters = self.vapi.cli("sh errors").split('\n')
1133 for i in range(1, len(counters) - 1):
1134 results = counters[i].split()
1135 if results[1] == counter:
1136 counter_value = int(results[0])
1138 return counter_value
1140 def assert_packet_counter_equal(self, counter, expected_value):
1141 counter_value = self.get_packet_counter(counter)
1142 self.assert_equal(counter_value, expected_value,
1143 "packet counter `%s'" % counter)
1145 def assert_error_counter_equal(self, counter, expected_value):
1146 counter_value = self.statistics.get_err_counter(counter)
1147 self.assert_equal(counter_value, expected_value,
1148 "error counter `%s'" % counter)
1151 def sleep(cls, timeout, remark=None):
1153 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1154 # * by Guido, only the main thread can be interrupted.
1156 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1159 if hasattr(os, 'sched_yield'):
1165 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1166 before = time.time()
1169 if after - before > 2 * timeout:
1170 cls.logger.error("unexpected self.sleep() result - "
1171 "slept for %es instead of ~%es!",
1172 after - before, timeout)
1175 "Finished sleep (%s) - slept %es (wanted %es)",
1176 remark, after - before, timeout)
1178 def pg_send(self, intf, pkts, worker=None):
1179 self.vapi.cli("clear trace")
1180 intf.add_stream(pkts, worker=worker)
1181 self.pg_enable_capture(self.pg_interfaces)
1184 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
1185 self.pg_send(intf, pkts)
1188 for i in self.pg_interfaces:
1189 i.get_capture(0, timeout=timeout)
1190 i.assert_nothing_captured(remark=remark)
1193 def send_and_expect(self, intf, pkts, output, n_rx=None, worker=None):
1196 self.pg_send(intf, pkts, worker=worker)
1197 rx = output.get_capture(n_rx)
1200 def send_and_expect_only(self, intf, pkts, output, timeout=None):
1201 self.pg_send(intf, pkts)
1202 rx = output.get_capture(len(pkts))
1206 for i in self.pg_interfaces:
1207 if i not in outputs:
1208 i.get_capture(0, timeout=timeout)
1209 i.assert_nothing_captured()
1215 def get_testcase_doc_name(test):
1216 return getdoc(test.__class__).splitlines()[0]
1219 def get_test_description(descriptions, test):
1220 short_description = test.shortDescription()
1221 if descriptions and short_description:
1222 return short_description
1227 class TestCaseInfo(object):
1228 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1229 self.logger = logger
1230 self.tempdir = tempdir
1231 self.vpp_pid = vpp_pid
1232 self.vpp_bin_path = vpp_bin_path
1233 self.core_crash_test = None
1236 class VppTestResult(unittest.TestResult):
1238 @property result_string
1239 String variable to store the test case result string.
1241 List variable containing 2-tuples of TestCase instances and strings
1242 holding formatted tracebacks. Each tuple represents a test which
1243 raised an unexpected exception.
1245 List variable containing 2-tuples of TestCase instances and strings
1246 holding formatted tracebacks. Each tuple represents a test where
1247 a failure was explicitly signalled using the TestCase.assert*()
1251 failed_test_cases_info = set()
1252 core_crash_test_cases_info = set()
1253 current_test_case_info = None
1255 def __init__(self, stream=None, descriptions=None, verbosity=None,
1258 :param stream File descriptor to store where to report test results.
1259 Set to the standard error stream by default.
1260 :param descriptions Boolean variable to store information if to use
1261 test case descriptions.
1262 :param verbosity Integer variable to store required verbosity level.
1264 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1265 self.stream = stream
1266 self.descriptions = descriptions
1267 self.verbosity = verbosity
1268 self.result_string = None
1269 self.runner = runner
1271 def addSuccess(self, test):
1273 Record a test succeeded result
1278 if self.current_test_case_info:
1279 self.current_test_case_info.logger.debug(
1280 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1281 test._testMethodName,
1282 test._testMethodDoc))
1283 unittest.TestResult.addSuccess(self, test)
1284 self.result_string = colorize("OK", GREEN)
1286 self.send_result_through_pipe(test, PASS)
1288 def addSkip(self, test, reason):
1290 Record a test skipped.
1296 if self.current_test_case_info:
1297 self.current_test_case_info.logger.debug(
1298 "--- addSkip() %s.%s(%s) called, reason is %s" %
1299 (test.__class__.__name__, test._testMethodName,
1300 test._testMethodDoc, reason))
1301 unittest.TestResult.addSkip(self, test, reason)
1302 self.result_string = colorize("SKIP", YELLOW)
1304 self.send_result_through_pipe(test, SKIP)
1306 def symlink_failed(self):
1307 if self.current_test_case_info:
1309 failed_dir = os.getenv('FAILED_DIR')
1310 link_path = os.path.join(
1313 os.path.basename(self.current_test_case_info.tempdir))
1315 self.current_test_case_info.logger.debug(
1316 "creating a link to the failed test")
1317 self.current_test_case_info.logger.debug(
1318 "os.symlink(%s, %s)" %
1319 (self.current_test_case_info.tempdir, link_path))
1320 if os.path.exists(link_path):
1321 self.current_test_case_info.logger.debug(
1322 'symlink already exists')
1324 os.symlink(self.current_test_case_info.tempdir, link_path)
1326 except Exception as e:
1327 self.current_test_case_info.logger.error(e)
1329 def send_result_through_pipe(self, test, result):
1330 if hasattr(self, 'test_framework_result_pipe'):
1331 pipe = self.test_framework_result_pipe
1333 pipe.send((test.id(), result))
1335 def log_error(self, test, err, fn_name):
1336 if self.current_test_case_info:
1337 if isinstance(test, unittest.suite._ErrorHolder):
1338 test_name = test.description
1340 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1341 test._testMethodName,
1342 test._testMethodDoc)
1343 self.current_test_case_info.logger.debug(
1344 "--- %s() %s called, err is %s" %
1345 (fn_name, test_name, err))
1346 self.current_test_case_info.logger.debug(
1347 "formatted exception is:\n%s" %
1348 "".join(format_exception(*err)))
1350 def add_error(self, test, err, unittest_fn, error_type):
1351 if error_type == FAIL:
1352 self.log_error(test, err, 'addFailure')
1353 error_type_str = colorize("FAIL", RED)
1354 elif error_type == ERROR:
1355 self.log_error(test, err, 'addError')
1356 error_type_str = colorize("ERROR", RED)
1358 raise Exception('Error type %s cannot be used to record an '
1359 'error or a failure' % error_type)
1361 unittest_fn(self, test, err)
1362 if self.current_test_case_info:
1363 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1365 self.current_test_case_info.tempdir)
1366 self.symlink_failed()
1367 self.failed_test_cases_info.add(self.current_test_case_info)
1368 if is_core_present(self.current_test_case_info.tempdir):
1369 if not self.current_test_case_info.core_crash_test:
1370 if isinstance(test, unittest.suite._ErrorHolder):
1371 test_name = str(test)
1373 test_name = "'{!s}' ({!s})".format(
1374 get_testcase_doc_name(test), test.id())
1375 self.current_test_case_info.core_crash_test = test_name
1376 self.core_crash_test_cases_info.add(
1377 self.current_test_case_info)
1379 self.result_string = '%s [no temp dir]' % error_type_str
1381 self.send_result_through_pipe(test, error_type)
1383 def addFailure(self, test, err):
1385 Record a test failed result
1388 :param err: error message
1391 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1393 def addError(self, test, err):
1395 Record a test error result
1398 :param err: error message
1401 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1403 def getDescription(self, test):
1405 Get test description
1408 :returns: test description
1411 return get_test_description(self.descriptions, test)
1413 def startTest(self, test):
1421 def print_header(test):
1422 test_doc = getdoc(test)
1424 raise Exception("No doc string for test '%s'" % test.id())
1425 test_title = test_doc.splitlines()[0]
1426 test_title_colored = colorize(test_title, GREEN)
1427 if test.force_solo():
1428 # long live PEP-8 and 80 char width limitation...
1430 test_title_colored = colorize("SOLO RUN: " + test_title, c)
1432 if not hasattr(test.__class__, '_header_printed'):
1433 print(double_line_delim)
1434 print(test_title_colored)
1435 print(double_line_delim)
1436 test.__class__._header_printed = True
1439 self.start_test = time.time()
1440 unittest.TestResult.startTest(self, test)
1441 if self.verbosity > 0:
1442 self.stream.writeln(
1443 "Starting " + self.getDescription(test) + " ...")
1444 self.stream.writeln(single_line_delim)
1446 def stopTest(self, test):
1448 Called when the given test has been run
1453 unittest.TestResult.stopTest(self, test)
1455 if self.verbosity > 0:
1456 self.stream.writeln(single_line_delim)
1457 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1458 self.result_string))
1459 self.stream.writeln(single_line_delim)
1461 self.stream.writeln("%-68s %4.2f %s" %
1462 (self.getDescription(test),
1463 time.time() - self.start_test,
1464 self.result_string))
1466 self.send_result_through_pipe(test, TEST_RUN)
1468 def printErrors(self):
1470 Print errors from running the test case
1472 if len(self.errors) > 0 or len(self.failures) > 0:
1473 self.stream.writeln()
1474 self.printErrorList('ERROR', self.errors)
1475 self.printErrorList('FAIL', self.failures)
1477 # ^^ that is the last output from unittest before summary
1478 if not self.runner.print_summary:
1479 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1480 self.stream = devnull
1481 self.runner.stream = devnull
1483 def printErrorList(self, flavour, errors):
1485 Print error list to the output stream together with error type
1486 and test case description.
1488 :param flavour: error type
1489 :param errors: iterable errors
1492 for test, err in errors:
1493 self.stream.writeln(double_line_delim)
1494 self.stream.writeln("%s: %s" %
1495 (flavour, self.getDescription(test)))
1496 self.stream.writeln(single_line_delim)
1497 self.stream.writeln("%s" % err)
1500 class VppTestRunner(unittest.TextTestRunner):
1502 A basic test runner implementation which prints results to standard error.
1506 def resultclass(self):
1507 """Class maintaining the results of the tests"""
1508 return VppTestResult
1510 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1511 result_pipe=None, failfast=False, buffer=False,
1512 resultclass=None, print_summary=True, **kwargs):
1513 # ignore stream setting here, use hard-coded stdout to be in sync
1514 # with prints from VppTestCase methods ...
1515 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1516 verbosity, failfast, buffer,
1517 resultclass, **kwargs)
1518 KeepAliveReporter.pipe = keep_alive_pipe
1520 self.orig_stream = self.stream
1521 self.resultclass.test_framework_result_pipe = result_pipe
1523 self.print_summary = print_summary
1525 def _makeResult(self):
1526 return self.resultclass(self.stream,
1531 def run(self, test):
1538 faulthandler.enable() # emit stack trace to stderr if killed by signal
1540 result = super(VppTestRunner, self).run(test)
1541 if not self.print_summary:
1542 self.stream = self.orig_stream
1543 result.stream = self.orig_stream
1547 class Worker(Thread):
1548 def __init__(self, executable_args, logger, env=None, *args, **kwargs):
1549 super(Worker, self).__init__(*args, **kwargs)
1550 self.logger = logger
1551 self.args = executable_args
1552 if hasattr(self, 'testcase') and self.testcase.debug_all:
1553 if self.testcase.debug_gdbserver:
1554 self.args = ['/usr/bin/gdbserver', 'localhost:{port}'
1555 .format(port=self.testcase.gdbserver_port)] + args
1556 elif self.testcase.debug_gdb and hasattr(self, 'wait_for_gdb'):
1557 self.args.append(self.wait_for_gdb)
1558 self.app_bin = executable_args[0]
1559 self.app_name = os.path.basename(self.app_bin)
1560 if hasattr(self, 'role'):
1561 self.app_name += ' {role}'.format(role=self.role)
1564 env = {} if env is None else env
1565 self.env = copy.deepcopy(env)
1567 def wait_for_enter(self):
1568 if not hasattr(self, 'testcase'):
1570 if self.testcase.debug_all and self.testcase.debug_gdbserver:
1572 print(double_line_delim)
1573 print("Spawned GDB Server for '{app}' with PID: {pid}"
1574 .format(app=self.app_name, pid=self.process.pid))
1575 elif self.testcase.debug_all and self.testcase.debug_gdb:
1577 print(double_line_delim)
1578 print("Spawned '{app}' with PID: {pid}"
1579 .format(app=self.app_name, pid=self.process.pid))
1582 print(single_line_delim)
1583 print("You can debug '{app}' using:".format(app=self.app_name))
1584 if self.testcase.debug_gdbserver:
1585 print("sudo gdb " + self.app_bin +
1586 " -ex 'target remote localhost:{port}'"
1587 .format(port=self.testcase.gdbserver_port))
1588 print("Now is the time to attach gdb by running the above "
1589 "command, set up breakpoints etc., then resume from "
1590 "within gdb by issuing the 'continue' command")
1591 self.testcase.gdbserver_port += 1
1592 elif self.testcase.debug_gdb:
1593 print("sudo gdb " + self.app_bin +
1594 " -ex 'attach {pid}'".format(pid=self.process.pid))
1595 print("Now is the time to attach gdb by running the above "
1596 "command and set up breakpoints etc., then resume from"
1597 " within gdb by issuing the 'continue' command")
1598 print(single_line_delim)
1599 input("Press ENTER to continue running the testcase...")
1602 executable = self.args[0]
1603 if not os.path.exists(executable) or not os.access(
1604 executable, os.F_OK | os.X_OK):
1605 # Exit code that means some system file did not exist,
1606 # could not be opened, or had some other kind of error.
1607 self.result = os.EX_OSFILE
1608 raise EnvironmentError(
1609 "executable '%s' is not found or executable." % executable)
1610 self.logger.debug("Running executable: '{app}'"
1611 .format(app=' '.join(self.args)))
1612 env = os.environ.copy()
1613 env.update(self.env)
1614 env["CK_LOG_FILE_NAME"] = "-"
1615 self.process = subprocess.Popen(
1616 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1617 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1618 self.wait_for_enter()
1619 out, err = self.process.communicate()
1620 self.logger.debug("Finished running `{app}'".format(app=self.app_name))
1621 self.logger.info("Return code is `%s'" % self.process.returncode)
1622 self.logger.info(single_line_delim)
1623 self.logger.info("Executable `{app}' wrote to stdout:"
1624 .format(app=self.app_name))
1625 self.logger.info(single_line_delim)
1626 self.logger.info(out.decode('utf-8'))
1627 self.logger.info(single_line_delim)
1628 self.logger.info("Executable `{app}' wrote to stderr:"
1629 .format(app=self.app_name))
1630 self.logger.info(single_line_delim)
1631 self.logger.info(err.decode('utf-8'))
1632 self.logger.info(single_line_delim)
1633 self.result = self.process.returncode
1636 if __name__ == '__main__':