3 from __future__ import print_function
17 from collections import deque
18 from threading import Thread, Event
19 from inspect import getdoc, isclass
20 from traceback import format_exception
21 from logging import FileHandler, DEBUG, Formatter
24 from scapy.packet import Raw
25 import hook as hookmodule
26 from vpp_pg_interface import VppPGInterface
27 from vpp_sub_interface import VppSubInterface
28 from vpp_lo_interface import VppLoInterface
29 from vpp_bvi_interface import VppBviInterface
30 from vpp_papi_provider import VppPapiProvider
31 from vpp_papi.vpp_stats import VPPStats
32 from vpp_papi.vpp_transport_shmem import VppTransportShmemIOError
33 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
35 from vpp_object import VppObjectRegistry
36 from util import ppp, is_core_present
37 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
38 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
39 from scapy.layers.inet6 import ICMPv6EchoReply
41 if os.name == 'posix' and sys.version_info[0] < 3:
42 # using subprocess32 is recommended by python official documentation
43 # @ https://docs.python.org/2/library/subprocess.html
44 import subprocess32 as subprocess
48 # Python2/3 compatible
61 class BoolEnvironmentVariable(object):
63 def __init__(self, env_var_name, default='n', true_values=None):
64 self.name = env_var_name
65 self.default = default
66 self.true_values = true_values if true_values is not None else \
70 return os.getenv(self.name, self.default).lower() in self.true_values
72 if sys.version_info[0] == 2:
73 __nonzero__ = __bool__
76 return 'BoolEnvironmentVariable(%r, default=%r, true_values=%r)' % \
77 (self.name, self.default, self.true_values)
80 debug_framework = BoolEnvironmentVariable('TEST_DEBUG')
85 Test framework module.
87 The module provides a set of tools for constructing and running tests and
88 representing the results.
92 class VppDiedError(Exception):
93 """ exception for reporting that the subprocess has died."""
95 signals_by_value = {v: k for k, v in signal.__dict__.items() if
96 k.startswith('SIG') and not k.startswith('SIG_')}
98 def __init__(self, rv=None, testcase=None, method_name=None):
100 self.signal_name = None
101 self.testcase = testcase
102 self.method_name = method_name
105 self.signal_name = VppDiedError.signals_by_value[-rv]
106 except (KeyError, TypeError):
109 if testcase is None and method_name is None:
112 in_msg = 'running %s.%s ' % (testcase, method_name)
114 msg = "VPP subprocess died %sunexpectedly with return code: %d%s." % (
117 ' [%s]' % (self.signal_name if
118 self.signal_name is not None else ''))
119 super(VppDiedError, self).__init__(msg)
122 class _PacketInfo(object):
123 """Private class to create packet info object.
125 Help process information about the next packet.
126 Set variables to default values.
128 #: Store the index of the packet.
130 #: Store the index of the source packet generator interface of the packet.
132 #: Store the index of the destination packet generator interface
135 #: Store expected ip version
137 #: Store expected upper protocol
139 #: Store the copy of the former packet.
142 def __eq__(self, other):
143 index = self.index == other.index
144 src = self.src == other.src
145 dst = self.dst == other.dst
146 data = self.data == other.data
147 return index and src and dst and data
150 def pump_output(testclass):
151 """ pump output from vpp stdout/stderr to proper queues """
154 while not testclass.pump_thread_stop_flag.is_set():
155 readable = select.select([testclass.vpp.stdout.fileno(),
156 testclass.vpp.stderr.fileno(),
157 testclass.pump_thread_wakeup_pipe[0]],
159 if testclass.vpp.stdout.fileno() in readable:
160 read = os.read(testclass.vpp.stdout.fileno(), 102400)
162 split = read.decode('ascii',
163 errors='backslashreplace').splitlines(True)
164 if len(stdout_fragment) > 0:
165 split[0] = "%s%s" % (stdout_fragment, split[0])
166 if len(split) > 0 and split[-1].endswith("\n"):
170 stdout_fragment = split[-1]
171 testclass.vpp_stdout_deque.extend(split[:limit])
172 if not testclass.cache_vpp_output:
173 for line in split[:limit]:
174 testclass.logger.info(
175 "VPP STDOUT: %s" % line.rstrip("\n"))
176 if testclass.vpp.stderr.fileno() in readable:
177 read = os.read(testclass.vpp.stderr.fileno(), 102400)
179 split = read.decode('ascii',
180 errors='backslashreplace').splitlines(True)
181 if len(stderr_fragment) > 0:
182 split[0] = "%s%s" % (stderr_fragment, split[0])
183 if len(split) > 0 and split[-1].endswith("\n"):
187 stderr_fragment = split[-1]
189 testclass.vpp_stderr_deque.extend(split[:limit])
190 if not testclass.cache_vpp_output:
191 for line in split[:limit]:
192 testclass.logger.error(
193 "VPP STDERR: %s" % line.rstrip("\n"))
194 # ignoring the dummy pipe here intentionally - the
195 # flag will take care of properly terminating the loop
198 def _is_skip_aarch64_set():
199 return BoolEnvironmentVariable('SKIP_AARCH64')
202 is_skip_aarch64_set = _is_skip_aarch64_set()
205 def _is_platform_aarch64():
206 return platform.machine() == 'aarch64'
209 is_platform_aarch64 = _is_platform_aarch64()
212 def _running_extended_tests():
213 return BoolEnvironmentVariable("EXTENDED_TESTS")
216 running_extended_tests = _running_extended_tests()
219 def _running_on_centos():
220 os_id = os.getenv("OS_ID", "")
221 return True if "centos" in os_id.lower() else False
224 running_on_centos = _running_on_centos()
227 class KeepAliveReporter(object):
229 Singleton object which reports test start to parent process
234 self.__dict__ = self._shared_state
242 def pipe(self, pipe):
243 if self._pipe is not None:
244 raise Exception("Internal error - pipe should only be set once.")
247 def send_keep_alive(self, test, desc=None):
249 Write current test tmpdir & desc to keep-alive pipe to signal liveness
251 if self.pipe is None:
252 # if not running forked..
256 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
260 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
263 class VppTestCase(unittest.TestCase):
264 """This subclass is a base class for VPP test cases that are implemented as
265 classes. It provides methods to create and run test case.
268 extra_vpp_punt_config = []
269 extra_vpp_plugin_config = []
272 def packet_infos(self):
273 """List of packet infos"""
274 return self._packet_infos
277 def get_packet_count_for_if_idx(cls, dst_if_index):
278 """Get the number of packet info for specified destination if index"""
279 if dst_if_index in cls._packet_count_for_dst_if_idx:
280 return cls._packet_count_for_dst_if_idx[dst_if_index]
286 """Return the instance of this testcase"""
287 return cls.test_instance
290 def set_debug_flags(cls, d):
291 cls.gdbserver_port = 7777
292 cls.debug_core = False
293 cls.debug_gdb = False
294 cls.debug_gdbserver = False
295 cls.debug_all = False
300 cls.debug_core = True
301 elif dl == "gdb" or dl == "gdb-all":
303 elif dl == "gdbserver" or dl == "gdbserver-all":
304 cls.debug_gdbserver = True
306 raise Exception("Unrecognized DEBUG option: '%s'" % d)
307 if dl == "gdb-all" or dl == "gdbserver-all":
311 def get_least_used_cpu():
312 cpu_usage_list = [set(range(psutil.cpu_count()))]
313 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
314 if 'vpp_main' == p.info['name']]
315 for vpp_process in vpp_processes:
316 for cpu_usage_set in cpu_usage_list:
318 cpu_num = vpp_process.cpu_num()
319 if cpu_num in cpu_usage_set:
320 cpu_usage_set_index = cpu_usage_list.index(
322 if cpu_usage_set_index == len(cpu_usage_list) - 1:
323 cpu_usage_list.append({cpu_num})
325 cpu_usage_list[cpu_usage_set_index + 1].add(
327 cpu_usage_set.remove(cpu_num)
329 except psutil.NoSuchProcess:
332 for cpu_usage_set in cpu_usage_list:
333 if len(cpu_usage_set) > 0:
334 min_usage_set = cpu_usage_set
337 return random.choice(tuple(min_usage_set))
340 def setUpConstants(cls):
341 """ Set-up the test case class based on environment variables """
342 cls.step = BoolEnvironmentVariable('STEP')
343 d = os.getenv("DEBUG", None)
344 # inverted case to handle '' == True
345 c = os.getenv("CACHE_OUTPUT", "1")
346 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
347 cls.set_debug_flags(d)
348 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
349 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
350 cls.test_plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
351 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
353 if cls.plugin_path is not None:
354 if cls.extern_plugin_path is not None:
355 plugin_path = "%s:%s" % (
356 cls.plugin_path, cls.extern_plugin_path)
358 plugin_path = cls.plugin_path
359 elif cls.extern_plugin_path is not None:
360 plugin_path = cls.extern_plugin_path
362 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
363 debug_cli = "cli-listen localhost:5002"
365 size = os.getenv("COREDUMP_SIZE")
367 coredump_size = "coredump-size %s" % size
368 if coredump_size is None:
369 coredump_size = "coredump-size unlimited"
371 cpu_core_number = cls.get_least_used_cpu()
372 if not hasattr(cls, "worker_config"):
373 cls.worker_config = ""
375 cls.vpp_cmdline = [cls.vpp_bin, "unix",
376 "{", "nodaemon", debug_cli, "full-coredump",
377 coredump_size, "runtime-dir", cls.tempdir, "}",
378 "api-trace", "{", "on", "}", "api-segment", "{",
379 "prefix", cls.shm_prefix, "}", "cpu", "{",
380 "main-core", str(cpu_core_number),
381 cls.worker_config, "}",
382 "statseg", "{", "socket-name", cls.stats_sock, "}",
383 "socksvr", "{", "socket-name", cls.api_sock, "}",
385 "{", "plugin", "dpdk_plugin.so", "{", "disable",
386 "}", "plugin", "rdma_plugin.so", "{", "disable",
387 "}", "plugin", "unittest_plugin.so", "{", "enable",
388 "}"] + cls.extra_vpp_plugin_config + ["}", ]
389 if cls.extra_vpp_punt_config is not None:
390 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
391 if plugin_path is not None:
392 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
393 if cls.test_plugin_path is not None:
394 cls.vpp_cmdline.extend(["test_plugin_path", cls.test_plugin_path])
396 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
397 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
400 def wait_for_enter(cls):
401 if cls.debug_gdbserver:
402 print(double_line_delim)
403 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
405 print(double_line_delim)
406 print("Spawned VPP with PID: %d" % cls.vpp.pid)
408 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
410 print(single_line_delim)
411 print("You can debug VPP using:")
412 if cls.debug_gdbserver:
413 print("sudo gdb " + cls.vpp_bin +
414 " -ex 'target remote localhost:{port}'"
415 .format(port=cls.gdbserver_port))
416 print("Now is the time to attach gdb by running the above "
417 "command, set up breakpoints etc., then resume VPP from "
418 "within gdb by issuing the 'continue' command")
419 cls.gdbserver_port += 1
421 print("sudo gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
422 print("Now is the time to attach gdb by running the above "
423 "command and set up breakpoints etc., then resume VPP from"
424 " within gdb by issuing the 'continue' command")
425 print(single_line_delim)
426 input("Press ENTER to continue running the testcase...")
430 cmdline = cls.vpp_cmdline
432 if cls.debug_gdbserver:
433 gdbserver = '/usr/bin/gdbserver'
434 if not os.path.isfile(gdbserver) or \
435 not os.access(gdbserver, os.X_OK):
436 raise Exception("gdbserver binary '%s' does not exist or is "
437 "not executable" % gdbserver)
439 cmdline = [gdbserver, 'localhost:{port}'
440 .format(port=cls.gdbserver_port)] + cls.vpp_cmdline
441 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
444 cls.vpp = subprocess.Popen(cmdline,
445 stdout=subprocess.PIPE,
446 stderr=subprocess.PIPE,
448 except subprocess.CalledProcessError as e:
449 cls.logger.critical("Subprocess returned with non-0 return code: ("
453 cls.logger.critical("Subprocess returned with OS error: "
454 "(%s) %s", e.errno, e.strerror)
456 except Exception as e:
457 cls.logger.exception("Subprocess returned unexpected from "
464 def wait_for_stats_socket(cls):
465 deadline = time.time() + 300
467 while time.time() < deadline or \
468 cls.debug_gdb or cls.debug_gdbserver:
469 if os.path.exists(cls.stats_sock):
474 cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
477 def wait_for_coredump(cls):
478 corefile = cls.tempdir + "/core"
479 if os.path.isfile(corefile):
480 cls.logger.error("Waiting for coredump to complete: %s", corefile)
481 curr_size = os.path.getsize(corefile)
482 deadline = time.time() + 60
484 while time.time() < deadline:
487 curr_size = os.path.getsize(corefile)
488 if size == curr_size:
492 cls.logger.error("Timed out waiting for coredump to complete:"
495 cls.logger.error("Coredump complete: %s, size %d",
501 Perform class setup before running the testcase
502 Remove shared memory files, start vpp and connect the vpp-api
504 super(VppTestCase, cls).setUpClass()
505 gc.collect() # run garbage collection first
506 cls.logger = get_logger(cls.__name__)
507 seed = os.environ["RND_SEED"]
509 if hasattr(cls, 'parallel_handler'):
510 cls.logger.addHandler(cls.parallel_handler)
511 cls.logger.propagate = False
513 cls.tempdir = tempfile.mkdtemp(
514 prefix='vpp-unittest-%s-' % cls.__name__)
515 cls.stats_sock = "%s/stats.sock" % cls.tempdir
516 cls.api_sock = "%s/api.sock" % cls.tempdir
517 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
518 cls.file_handler.setFormatter(
519 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
521 cls.file_handler.setLevel(DEBUG)
522 cls.logger.addHandler(cls.file_handler)
523 cls.logger.debug("--- setUpClass() for %s called ---" %
525 cls.shm_prefix = os.path.basename(cls.tempdir)
526 os.chdir(cls.tempdir)
527 cls.logger.info("Temporary dir is %s, shm prefix is %s",
528 cls.tempdir, cls.shm_prefix)
529 cls.logger.debug("Random seed is %s" % seed)
531 cls.reset_packet_infos()
535 cls.registry = VppObjectRegistry()
536 cls.vpp_startup_failed = False
537 cls.reporter = KeepAliveReporter()
538 # need to catch exceptions here because if we raise, then the cleanup
539 # doesn't get called and we might end with a zombie vpp
542 cls.reporter.send_keep_alive(cls, 'setUpClass')
543 VppTestResult.current_test_case_info = TestCaseInfo(
544 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
545 cls.vpp_stdout_deque = deque()
546 cls.vpp_stderr_deque = deque()
547 cls.pump_thread_stop_flag = Event()
548 cls.pump_thread_wakeup_pipe = os.pipe()
549 cls.pump_thread = Thread(target=pump_output, args=(cls,))
550 cls.pump_thread.daemon = True
551 cls.pump_thread.start()
552 if cls.debug_gdb or cls.debug_gdbserver:
556 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
559 hook = hookmodule.StepHook(cls)
561 hook = hookmodule.PollHook(cls)
562 cls.vapi.register_hook(hook)
563 cls.wait_for_stats_socket()
564 cls.statistics = VPPStats(socketname=cls.stats_sock)
568 cls.vpp_startup_failed = True
570 "VPP died shortly after startup, check the"
571 " output to standard error for possible cause")
577 cls.vapi.disconnect()
580 if cls.debug_gdbserver:
581 print(colorize("You're running VPP inside gdbserver but "
582 "VPP-API connection failed, did you forget "
583 "to 'continue' VPP from within gdb?", RED))
585 except Exception as e:
586 cls.logger.debug("Exception connecting to VPP: %s" % e)
594 Disconnect vpp-api, kill vpp and cleanup shared memory files
596 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
598 if cls.vpp.returncode is None:
600 print(double_line_delim)
601 print("VPP or GDB server is still running")
602 print(single_line_delim)
603 input("When done debugging, press ENTER to kill the "
604 "process and finish running the testcase...")
606 # first signal that we want to stop the pump thread, then wake it up
607 if hasattr(cls, 'pump_thread_stop_flag'):
608 cls.pump_thread_stop_flag.set()
609 if hasattr(cls, 'pump_thread_wakeup_pipe'):
610 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
611 if hasattr(cls, 'pump_thread'):
612 cls.logger.debug("Waiting for pump thread to stop")
613 cls.pump_thread.join()
614 if hasattr(cls, 'vpp_stderr_reader_thread'):
615 cls.logger.debug("Waiting for stdderr pump to stop")
616 cls.vpp_stderr_reader_thread.join()
618 if hasattr(cls, 'vpp'):
619 if hasattr(cls, 'vapi'):
620 cls.logger.debug("Disconnecting class vapi client on %s",
622 cls.vapi.disconnect()
623 cls.logger.debug("Deleting class vapi attribute on %s",
627 if cls.vpp.returncode is None:
628 cls.wait_for_coredump()
629 cls.logger.debug("Sending TERM to vpp")
631 cls.logger.debug("Waiting for vpp to die")
632 cls.vpp.communicate()
633 cls.logger.debug("Deleting class vpp attribute on %s",
637 if cls.vpp_startup_failed:
638 stdout_log = cls.logger.info
639 stderr_log = cls.logger.critical
641 stdout_log = cls.logger.info
642 stderr_log = cls.logger.info
644 if hasattr(cls, 'vpp_stdout_deque'):
645 stdout_log(single_line_delim)
646 stdout_log('VPP output to stdout while running %s:', cls.__name__)
647 stdout_log(single_line_delim)
648 vpp_output = "".join(cls.vpp_stdout_deque)
649 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
651 stdout_log('\n%s', vpp_output)
652 stdout_log(single_line_delim)
654 if hasattr(cls, 'vpp_stderr_deque'):
655 stderr_log(single_line_delim)
656 stderr_log('VPP output to stderr while running %s:', cls.__name__)
657 stderr_log(single_line_delim)
658 vpp_output = "".join(cls.vpp_stderr_deque)
659 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
661 stderr_log('\n%s', vpp_output)
662 stderr_log(single_line_delim)
665 def tearDownClass(cls):
666 """ Perform final cleanup after running all tests in this test-case """
667 cls.logger.debug("--- tearDownClass() for %s called ---" %
669 cls.reporter.send_keep_alive(cls, 'tearDownClass')
671 cls.file_handler.close()
672 cls.reset_packet_infos()
674 debug_internal.on_tear_down_class(cls)
676 def show_commands_at_teardown(self):
677 """ Allow subclass specific teardown logging additions."""
678 self.logger.info("--- No test specific show commands provided. ---")
681 """ Show various debug prints after each test """
682 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
683 (self.__class__.__name__, self._testMethodName,
684 self._testMethodDoc))
687 if not self.vpp_dead:
688 self.logger.debug(self.vapi.cli("show trace max 1000"))
689 self.logger.info(self.vapi.ppcli("show interface"))
690 self.logger.info(self.vapi.ppcli("show hardware"))
691 self.logger.info(self.statistics.set_errors_str())
692 self.logger.info(self.vapi.ppcli("show run"))
693 self.logger.info(self.vapi.ppcli("show log"))
694 self.logger.info(self.vapi.ppcli("show bihash"))
695 self.logger.info("Logging testcase specific show commands.")
696 self.show_commands_at_teardown()
697 self.registry.remove_vpp_config(self.logger)
698 # Save/Dump VPP api trace log
699 m = self._testMethodName
700 api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
701 tmp_api_trace = "/tmp/%s" % api_trace
702 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
703 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
704 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
706 os.rename(tmp_api_trace, vpp_api_trace_log)
707 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
709 except VppTransportShmemIOError:
710 self.logger.debug("VppTransportShmemIOError: Vpp dead. "
711 "Cannot log show commands.")
714 self.registry.unregister_all(self.logger)
717 """ Clear trace before running each test"""
718 super(VppTestCase, self).setUp()
719 self.reporter.send_keep_alive(self)
722 raise VppDiedError(rv=None, testcase=self.__class__.__name__,
723 method_name=self._testMethodName)
724 self.sleep(.1, "during setUp")
725 self.vpp_stdout_deque.append(
726 "--- test setUp() for %s.%s(%s) starts here ---\n" %
727 (self.__class__.__name__, self._testMethodName,
728 self._testMethodDoc))
729 self.vpp_stderr_deque.append(
730 "--- test setUp() for %s.%s(%s) starts here ---\n" %
731 (self.__class__.__name__, self._testMethodName,
732 self._testMethodDoc))
733 self.vapi.cli("clear trace")
734 # store the test instance inside the test class - so that objects
735 # holding the class can access instance methods (like assertEqual)
736 type(self).test_instance = self
739 def pg_enable_capture(cls, interfaces=None):
741 Enable capture on packet-generator interfaces
743 :param interfaces: iterable interface indexes (if None,
744 use self.pg_interfaces)
747 if interfaces is None:
748 interfaces = cls.pg_interfaces
753 def register_capture(cls, cap_name):
754 """ Register a capture in the testclass """
755 # add to the list of captures with current timestamp
756 cls._captures.append((time.time(), cap_name))
759 def get_vpp_time(cls):
760 return float(cls.vapi.cli('show clock').replace("Time now ", ""))
763 def sleep_on_vpp_time(cls, sec):
764 """ Sleep according to time in VPP world """
765 # On a busy system with many processes
766 # we might end up with VPP time being slower than real world
767 # So take that into account when waiting for VPP to do something
768 start_time = cls.get_vpp_time()
769 while cls.get_vpp_time() - start_time < sec:
774 """ Enable the PG, wait till it is done, then clean up """
775 cls.vapi.cli("trace add pg-input 1000")
776 cls.vapi.cli('packet-generator enable')
777 # PG, when starts, runs to completion -
778 # so let's avoid a race condition,
779 # and wait a little till it's done.
780 # Then clean it up - and then be gone.
781 deadline = time.time() + 300
782 while cls.vapi.cli('show packet-generator').find("Yes") != -1:
783 cls.sleep(0.01) # yield
784 if time.time() > deadline:
785 cls.logger.error("Timeout waiting for pg to stop")
787 for stamp, cap_name in cls._captures:
788 cls.vapi.cli('packet-generator delete %s' % cap_name)
792 def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
794 Create packet-generator interfaces.
796 :param interfaces: iterable indexes of the interfaces.
797 :returns: List of created interfaces.
802 intf = VppPGInterface(cls, i, gso, gso_size)
803 setattr(cls, intf.name, intf)
805 cls.pg_interfaces = result
809 def create_loopback_interfaces(cls, count):
811 Create loopback interfaces.
813 :param count: number of interfaces created.
814 :returns: List of created interfaces.
816 result = [VppLoInterface(cls) for i in range(count)]
818 setattr(cls, intf.name, intf)
819 cls.lo_interfaces = result
823 def create_bvi_interfaces(cls, count):
825 Create BVI interfaces.
827 :param count: number of interfaces created.
828 :returns: List of created interfaces.
830 result = [VppBviInterface(cls) for i in range(count)]
832 setattr(cls, intf.name, intf)
833 cls.bvi_interfaces = result
837 def extend_packet(packet, size, padding=' '):
839 Extend packet to given size by padding with spaces or custom padding
840 NOTE: Currently works only when Raw layer is present.
842 :param packet: packet
843 :param size: target size
844 :param padding: padding used to extend the payload
847 packet_len = len(packet) + 4
848 extend = size - packet_len
850 num = (extend // len(padding)) + 1
851 packet[Raw].load += (padding * num)[:extend].encode("ascii")
854 def reset_packet_infos(cls):
855 """ Reset the list of packet info objects and packet counts to zero """
856 cls._packet_infos = {}
857 cls._packet_count_for_dst_if_idx = {}
860 def create_packet_info(cls, src_if, dst_if):
862 Create packet info object containing the source and destination indexes
863 and add it to the testcase's packet info list
865 :param VppInterface src_if: source interface
866 :param VppInterface dst_if: destination interface
868 :returns: _PacketInfo object
872 info.index = len(cls._packet_infos)
873 info.src = src_if.sw_if_index
874 info.dst = dst_if.sw_if_index
875 if isinstance(dst_if, VppSubInterface):
876 dst_idx = dst_if.parent.sw_if_index
878 dst_idx = dst_if.sw_if_index
879 if dst_idx in cls._packet_count_for_dst_if_idx:
880 cls._packet_count_for_dst_if_idx[dst_idx] += 1
882 cls._packet_count_for_dst_if_idx[dst_idx] = 1
883 cls._packet_infos[info.index] = info
887 def info_to_payload(info):
889 Convert _PacketInfo object to packet payload
891 :param info: _PacketInfo object
893 :returns: string containing serialized data from packet info
895 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
899 def payload_to_info(payload, payload_field='load'):
901 Convert packet payload to _PacketInfo object
903 :param payload: packet payload
904 :type payload: <class 'scapy.packet.Raw'>
905 :param payload_field: packet fieldname of payload "load" for
906 <class 'scapy.packet.Raw'>
907 :type payload_field: str
908 :returns: _PacketInfo object containing de-serialized data from payload
911 numbers = getattr(payload, payload_field).split()
913 info.index = int(numbers[0])
914 info.src = int(numbers[1])
915 info.dst = int(numbers[2])
916 info.ip = int(numbers[3])
917 info.proto = int(numbers[4])
920 def get_next_packet_info(self, info):
922 Iterate over the packet info list stored in the testcase
923 Start iteration with first element if info is None
924 Continue based on index in info if info is specified
926 :param info: info or None
927 :returns: next info in list or None if no more infos
932 next_index = info.index + 1
933 if next_index == len(self._packet_infos):
936 return self._packet_infos[next_index]
938 def get_next_packet_info_for_interface(self, src_index, info):
940 Search the packet info list for the next packet info with same source
943 :param src_index: source interface index to search for
944 :param info: packet info - where to start the search
945 :returns: packet info or None
949 info = self.get_next_packet_info(info)
952 if info.src == src_index:
955 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
957 Search the packet info list for the next packet info with same source
958 and destination interface indexes
960 :param src_index: source interface index to search for
961 :param dst_index: destination interface index to search for
962 :param info: packet info - where to start the search
963 :returns: packet info or None
967 info = self.get_next_packet_info_for_interface(src_index, info)
970 if info.dst == dst_index:
973 def assert_equal(self, real_value, expected_value, name_or_class=None):
974 if name_or_class is None:
975 self.assertEqual(real_value, expected_value)
978 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
979 msg = msg % (getdoc(name_or_class).strip(),
980 real_value, str(name_or_class(real_value)),
981 expected_value, str(name_or_class(expected_value)))
983 msg = "Invalid %s: %s does not match expected value %s" % (
984 name_or_class, real_value, expected_value)
986 self.assertEqual(real_value, expected_value, msg)
988 def assert_in_range(self,
996 msg = "Invalid %s: %s out of range <%s,%s>" % (
997 name, real_value, expected_min, expected_max)
998 self.assertTrue(expected_min <= real_value <= expected_max, msg)
1000 def assert_packet_checksums_valid(self, packet,
1001 ignore_zero_udp_checksums=True):
1002 received = packet.__class__(scapy.compat.raw(packet))
1003 udp_layers = ['UDP', 'UDPerror']
1004 checksum_fields = ['cksum', 'chksum']
1007 temp = received.__class__(scapy.compat.raw(received))
1009 layer = temp.getlayer(counter)
1011 for cf in checksum_fields:
1012 if hasattr(layer, cf):
1013 if ignore_zero_udp_checksums and \
1014 0 == getattr(layer, cf) and \
1015 layer.name in udp_layers:
1018 checksums.append((counter, cf))
1021 counter = counter + 1
1022 if 0 == len(checksums):
1024 temp = temp.__class__(scapy.compat.raw(temp))
1025 for layer, cf in checksums:
1026 calc_sum = getattr(temp[layer], cf)
1028 getattr(received[layer], cf), calc_sum,
1029 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
1031 "Checksum field `%s` on `%s` layer has correct value `%s`" %
1032 (cf, temp[layer].name, calc_sum))
1034 def assert_checksum_valid(self, received_packet, layer,
1035 field_name='chksum',
1036 ignore_zero_checksum=False):
1037 """ Check checksum of received packet on given layer """
1038 received_packet_checksum = getattr(received_packet[layer], field_name)
1039 if ignore_zero_checksum and 0 == received_packet_checksum:
1041 recalculated = received_packet.__class__(
1042 scapy.compat.raw(received_packet))
1043 delattr(recalculated[layer], field_name)
1044 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1045 self.assert_equal(received_packet_checksum,
1046 getattr(recalculated[layer], field_name),
1047 "packet checksum on layer: %s" % layer)
1049 def assert_ip_checksum_valid(self, received_packet,
1050 ignore_zero_checksum=False):
1051 self.assert_checksum_valid(received_packet, 'IP',
1052 ignore_zero_checksum=ignore_zero_checksum)
1054 def assert_tcp_checksum_valid(self, received_packet,
1055 ignore_zero_checksum=False):
1056 self.assert_checksum_valid(received_packet, 'TCP',
1057 ignore_zero_checksum=ignore_zero_checksum)
1059 def assert_udp_checksum_valid(self, received_packet,
1060 ignore_zero_checksum=True):
1061 self.assert_checksum_valid(received_packet, 'UDP',
1062 ignore_zero_checksum=ignore_zero_checksum)
1064 def assert_embedded_icmp_checksum_valid(self, received_packet):
1065 if received_packet.haslayer(IPerror):
1066 self.assert_checksum_valid(received_packet, 'IPerror')
1067 if received_packet.haslayer(TCPerror):
1068 self.assert_checksum_valid(received_packet, 'TCPerror')
1069 if received_packet.haslayer(UDPerror):
1070 self.assert_checksum_valid(received_packet, 'UDPerror',
1071 ignore_zero_checksum=True)
1072 if received_packet.haslayer(ICMPerror):
1073 self.assert_checksum_valid(received_packet, 'ICMPerror')
1075 def assert_icmp_checksum_valid(self, received_packet):
1076 self.assert_checksum_valid(received_packet, 'ICMP')
1077 self.assert_embedded_icmp_checksum_valid(received_packet)
1079 def assert_icmpv6_checksum_valid(self, pkt):
1080 if pkt.haslayer(ICMPv6DestUnreach):
1081 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
1082 self.assert_embedded_icmp_checksum_valid(pkt)
1083 if pkt.haslayer(ICMPv6EchoRequest):
1084 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
1085 if pkt.haslayer(ICMPv6EchoReply):
1086 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
1088 def get_packet_counter(self, counter):
1089 if counter.startswith("/"):
1090 counter_value = self.statistics.get_counter(counter)
1092 counters = self.vapi.cli("sh errors").split('\n')
1094 for i in range(1, len(counters) - 1):
1095 results = counters[i].split()
1096 if results[1] == counter:
1097 counter_value = int(results[0])
1099 return counter_value
1101 def assert_packet_counter_equal(self, counter, expected_value):
1102 counter_value = self.get_packet_counter(counter)
1103 self.assert_equal(counter_value, expected_value,
1104 "packet counter `%s'" % counter)
1106 def assert_error_counter_equal(self, counter, expected_value):
1107 counter_value = self.statistics.get_err_counter(counter)
1108 self.assert_equal(counter_value, expected_value,
1109 "error counter `%s'" % counter)
1112 def sleep(cls, timeout, remark=None):
1114 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1115 # * by Guido, only the main thread can be interrupted.
1117 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1120 if hasattr(os, 'sched_yield'):
1126 if hasattr(cls, 'logger'):
1127 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1128 before = time.time()
1131 if hasattr(cls, 'logger') and after - before > 2 * timeout:
1132 cls.logger.error("unexpected self.sleep() result - "
1133 "slept for %es instead of ~%es!",
1134 after - before, timeout)
1135 if hasattr(cls, 'logger'):
1137 "Finished sleep (%s) - slept %es (wanted %es)",
1138 remark, after - before, timeout)
1140 def pg_send(self, intf, pkts):
1141 self.vapi.cli("clear trace")
1142 intf.add_stream(pkts)
1143 self.pg_enable_capture(self.pg_interfaces)
1146 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
1147 self.pg_send(intf, pkts)
1150 for i in self.pg_interfaces:
1151 i.get_capture(0, timeout=timeout)
1152 i.assert_nothing_captured(remark=remark)
1155 def send_and_expect(self, intf, pkts, output, n_rx=None):
1158 self.pg_send(intf, pkts)
1159 rx = output.get_capture(n_rx)
1162 def send_and_expect_only(self, intf, pkts, output, timeout=None):
1163 self.pg_send(intf, pkts)
1164 rx = output.get_capture(len(pkts))
1168 for i in self.pg_interfaces:
1169 if i not in outputs:
1170 i.get_capture(0, timeout=timeout)
1171 i.assert_nothing_captured()
1177 """ unittest calls runTest when TestCase is instantiated without a
1178 test case. Use case: Writing unittests against VppTestCase"""
1182 def get_testcase_doc_name(test):
1183 return getdoc(test.__class__).splitlines()[0]
1186 def get_test_description(descriptions, test):
1187 short_description = test.shortDescription()
1188 if descriptions and short_description:
1189 return short_description
1194 class TestCaseInfo(object):
1195 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1196 self.logger = logger
1197 self.tempdir = tempdir
1198 self.vpp_pid = vpp_pid
1199 self.vpp_bin_path = vpp_bin_path
1200 self.core_crash_test = None
1203 class VppTestResult(unittest.TestResult):
1205 @property result_string
1206 String variable to store the test case result string.
1208 List variable containing 2-tuples of TestCase instances and strings
1209 holding formatted tracebacks. Each tuple represents a test which
1210 raised an unexpected exception.
1212 List variable containing 2-tuples of TestCase instances and strings
1213 holding formatted tracebacks. Each tuple represents a test where
1214 a failure was explicitly signalled using the TestCase.assert*()
1218 failed_test_cases_info = set()
1219 core_crash_test_cases_info = set()
1220 current_test_case_info = None
1222 def __init__(self, stream=None, descriptions=None, verbosity=None,
1225 :param stream File descriptor to store where to report test results.
1226 Set to the standard error stream by default.
1227 :param descriptions Boolean variable to store information if to use
1228 test case descriptions.
1229 :param verbosity Integer variable to store required verbosity level.
1231 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1232 self.stream = stream
1233 self.descriptions = descriptions
1234 self.verbosity = verbosity
1235 self.result_string = None
1236 self.runner = runner
1238 def addSuccess(self, test):
1240 Record a test succeeded result
1245 if self.current_test_case_info:
1246 self.current_test_case_info.logger.debug(
1247 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1248 test._testMethodName,
1249 test._testMethodDoc))
1250 unittest.TestResult.addSuccess(self, test)
1251 self.result_string = colorize("OK", GREEN)
1253 self.send_result_through_pipe(test, PASS)
1255 def addSkip(self, test, reason):
1257 Record a test skipped.
1263 if self.current_test_case_info:
1264 self.current_test_case_info.logger.debug(
1265 "--- addSkip() %s.%s(%s) called, reason is %s" %
1266 (test.__class__.__name__, test._testMethodName,
1267 test._testMethodDoc, reason))
1268 unittest.TestResult.addSkip(self, test, reason)
1269 self.result_string = colorize("SKIP", YELLOW)
1271 self.send_result_through_pipe(test, SKIP)
1273 def symlink_failed(self):
1274 if self.current_test_case_info:
1276 failed_dir = os.getenv('FAILED_DIR')
1277 link_path = os.path.join(
1280 os.path.basename(self.current_test_case_info.tempdir))
1281 if self.current_test_case_info.logger:
1282 self.current_test_case_info.logger.debug(
1283 "creating a link to the failed test")
1284 self.current_test_case_info.logger.debug(
1285 "os.symlink(%s, %s)" %
1286 (self.current_test_case_info.tempdir, link_path))
1287 if os.path.exists(link_path):
1288 if self.current_test_case_info.logger:
1289 self.current_test_case_info.logger.debug(
1290 'symlink already exists')
1292 os.symlink(self.current_test_case_info.tempdir, link_path)
1294 except Exception as e:
1295 if self.current_test_case_info.logger:
1296 self.current_test_case_info.logger.error(e)
1298 def send_result_through_pipe(self, test, result):
1299 if hasattr(self, 'test_framework_result_pipe'):
1300 pipe = self.test_framework_result_pipe
1302 pipe.send((test.id(), result))
1304 def log_error(self, test, err, fn_name):
1305 if self.current_test_case_info:
1306 if isinstance(test, unittest.suite._ErrorHolder):
1307 test_name = test.description
1309 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1310 test._testMethodName,
1311 test._testMethodDoc)
1312 self.current_test_case_info.logger.debug(
1313 "--- %s() %s called, err is %s" %
1314 (fn_name, test_name, err))
1315 self.current_test_case_info.logger.debug(
1316 "formatted exception is:\n%s" %
1317 "".join(format_exception(*err)))
1319 def add_error(self, test, err, unittest_fn, error_type):
1320 if error_type == FAIL:
1321 self.log_error(test, err, 'addFailure')
1322 error_type_str = colorize("FAIL", RED)
1323 elif error_type == ERROR:
1324 self.log_error(test, err, 'addError')
1325 error_type_str = colorize("ERROR", RED)
1327 raise Exception('Error type %s cannot be used to record an '
1328 'error or a failure' % error_type)
1330 unittest_fn(self, test, err)
1331 if self.current_test_case_info:
1332 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1334 self.current_test_case_info.tempdir)
1335 self.symlink_failed()
1336 self.failed_test_cases_info.add(self.current_test_case_info)
1337 if is_core_present(self.current_test_case_info.tempdir):
1338 if not self.current_test_case_info.core_crash_test:
1339 if isinstance(test, unittest.suite._ErrorHolder):
1340 test_name = str(test)
1342 test_name = "'{!s}' ({!s})".format(
1343 get_testcase_doc_name(test), test.id())
1344 self.current_test_case_info.core_crash_test = test_name
1345 self.core_crash_test_cases_info.add(
1346 self.current_test_case_info)
1348 self.result_string = '%s [no temp dir]' % error_type_str
1350 self.send_result_through_pipe(test, error_type)
1352 def addFailure(self, test, err):
1354 Record a test failed result
1357 :param err: error message
1360 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1362 def addError(self, test, err):
1364 Record a test error result
1367 :param err: error message
1370 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1372 def getDescription(self, test):
1374 Get test description
1377 :returns: test description
1380 return get_test_description(self.descriptions, test)
1382 def startTest(self, test):
1390 def print_header(test):
1391 if not hasattr(test.__class__, '_header_printed'):
1392 print(double_line_delim)
1393 print(colorize(getdoc(test).splitlines()[0], GREEN))
1394 print(double_line_delim)
1395 test.__class__._header_printed = True
1399 unittest.TestResult.startTest(self, test)
1400 if self.verbosity > 0:
1401 self.stream.writeln(
1402 "Starting " + self.getDescription(test) + " ...")
1403 self.stream.writeln(single_line_delim)
1405 def stopTest(self, test):
1407 Called when the given test has been run
1412 unittest.TestResult.stopTest(self, test)
1413 if self.verbosity > 0:
1414 self.stream.writeln(single_line_delim)
1415 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1416 self.result_string))
1417 self.stream.writeln(single_line_delim)
1419 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1420 self.result_string))
1422 self.send_result_through_pipe(test, TEST_RUN)
1424 def printErrors(self):
1426 Print errors from running the test case
1428 if len(self.errors) > 0 or len(self.failures) > 0:
1429 self.stream.writeln()
1430 self.printErrorList('ERROR', self.errors)
1431 self.printErrorList('FAIL', self.failures)
1433 # ^^ that is the last output from unittest before summary
1434 if not self.runner.print_summary:
1435 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1436 self.stream = devnull
1437 self.runner.stream = devnull
1439 def printErrorList(self, flavour, errors):
1441 Print error list to the output stream together with error type
1442 and test case description.
1444 :param flavour: error type
1445 :param errors: iterable errors
1448 for test, err in errors:
1449 self.stream.writeln(double_line_delim)
1450 self.stream.writeln("%s: %s" %
1451 (flavour, self.getDescription(test)))
1452 self.stream.writeln(single_line_delim)
1453 self.stream.writeln("%s" % err)
1456 class VppTestRunner(unittest.TextTestRunner):
1458 A basic test runner implementation which prints results to standard error.
1462 def resultclass(self):
1463 """Class maintaining the results of the tests"""
1464 return VppTestResult
1466 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1467 result_pipe=None, failfast=False, buffer=False,
1468 resultclass=None, print_summary=True, **kwargs):
1469 # ignore stream setting here, use hard-coded stdout to be in sync
1470 # with prints from VppTestCase methods ...
1471 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1472 verbosity, failfast, buffer,
1473 resultclass, **kwargs)
1474 KeepAliveReporter.pipe = keep_alive_pipe
1476 self.orig_stream = self.stream
1477 self.resultclass.test_framework_result_pipe = result_pipe
1479 self.print_summary = print_summary
1481 def _makeResult(self):
1482 return self.resultclass(self.stream,
1487 def run(self, test):
1494 faulthandler.enable() # emit stack trace to stderr if killed by signal
1496 result = super(VppTestRunner, self).run(test)
1497 if not self.print_summary:
1498 self.stream = self.orig_stream
1499 result.stream = self.orig_stream
1503 class Worker(Thread):
1504 def __init__(self, args, logger, env=None):
1505 self.logger = logger
1507 if hasattr(self, 'testcase') and self.testcase.debug_all:
1508 if self.testcase.debug_gdbserver:
1509 self.args = ['/usr/bin/gdbserver', 'localhost:{port}'
1510 .format(port=self.testcase.gdbserver_port)] + args
1511 elif self.testcase.debug_gdb and hasattr(self, 'wait_for_gdb'):
1512 self.args.append(self.wait_for_gdb)
1513 self.app_bin = args[0]
1514 self.app_name = os.path.basename(self.app_bin)
1515 if hasattr(self, 'role'):
1516 self.app_name += ' {role}'.format(role=self.role)
1519 env = {} if env is None else env
1520 self.env = copy.deepcopy(env)
1521 super(Worker, self).__init__()
1523 def wait_for_enter(self):
1524 if not hasattr(self, 'testcase'):
1526 if self.testcase.debug_all and self.testcase.debug_gdbserver:
1528 print(double_line_delim)
1529 print("Spawned GDB Server for '{app}' with PID: {pid}"
1530 .format(app=self.app_name, pid=self.process.pid))
1531 elif self.testcase.debug_all and self.testcase.debug_gdb:
1533 print(double_line_delim)
1534 print("Spawned '{app}' with PID: {pid}"
1535 .format(app=self.app_name, pid=self.process.pid))
1538 print(single_line_delim)
1539 print("You can debug '{app}' using:".format(app=self.app_name))
1540 if self.testcase.debug_gdbserver:
1541 print("sudo gdb " + self.app_bin +
1542 " -ex 'target remote localhost:{port}'"
1543 .format(port=self.testcase.gdbserver_port))
1544 print("Now is the time to attach gdb by running the above "
1545 "command, set up breakpoints etc., then resume from "
1546 "within gdb by issuing the 'continue' command")
1547 self.testcase.gdbserver_port += 1
1548 elif self.testcase.debug_gdb:
1549 print("sudo gdb " + self.app_bin +
1550 " -ex 'attach {pid}'".format(pid=self.process.pid))
1551 print("Now is the time to attach gdb by running the above "
1552 "command and set up breakpoints etc., then resume from"
1553 " within gdb by issuing the 'continue' command")
1554 print(single_line_delim)
1555 input("Press ENTER to continue running the testcase...")
1558 executable = self.args[0]
1559 if not os.path.exists(executable) or not os.access(
1560 executable, os.F_OK | os.X_OK):
1561 # Exit code that means some system file did not exist,
1562 # could not be opened, or had some other kind of error.
1563 self.result = os.EX_OSFILE
1564 raise EnvironmentError(
1565 "executable '%s' is not found or executable." % executable)
1566 self.logger.debug("Running executable: '{app}'"
1567 .format(app=' '.join(self.args)))
1568 env = os.environ.copy()
1569 env.update(self.env)
1570 env["CK_LOG_FILE_NAME"] = "-"
1571 self.process = subprocess.Popen(
1572 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1573 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1574 self.wait_for_enter()
1575 out, err = self.process.communicate()
1576 self.logger.debug("Finished running `{app}'".format(app=self.app_name))
1577 self.logger.info("Return code is `%s'" % self.process.returncode)
1578 self.logger.info(single_line_delim)
1579 self.logger.info("Executable `{app}' wrote to stdout:"
1580 .format(app=self.app_name))
1581 self.logger.info(single_line_delim)
1582 self.logger.info(out.decode('utf-8'))
1583 self.logger.info(single_line_delim)
1584 self.logger.info("Executable `{app}' wrote to stderr:"
1585 .format(app=self.app_name))
1586 self.logger.info(single_line_delim)
1587 self.logger.info(err.decode('utf-8'))
1588 self.logger.info(single_line_delim)
1589 self.result = self.process.returncode
1592 if __name__ == '__main__':