3 from __future__ import print_function
17 from collections import deque
18 from threading import Thread, Event
19 from inspect import getdoc, isclass
20 from traceback import format_exception
21 from logging import FileHandler, DEBUG, Formatter
24 from scapy.packet import Raw
25 import hook as hookmodule
26 from vpp_pg_interface import VppPGInterface
27 from vpp_sub_interface import VppSubInterface
28 from vpp_lo_interface import VppLoInterface
29 from vpp_bvi_interface import VppBviInterface
30 from vpp_papi_provider import VppPapiProvider
32 from vpp_papi.vpp_stats import VPPStats
33 from vpp_papi.vpp_transport_shmem import VppTransportShmemIOError
34 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
36 from vpp_object import VppObjectRegistry
37 from util import ppp, is_core_present
38 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
39 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
40 from scapy.layers.inet6 import ICMPv6EchoReply
42 if os.name == 'posix' and sys.version_info[0] < 3:
43 # using subprocess32 is recommended by python official documentation
44 # @ https://docs.python.org/2/library/subprocess.html
45 import subprocess32 as subprocess
49 # Python2/3 compatible
62 class BoolEnvironmentVariable(object):
64 def __init__(self, env_var_name, default='n', true_values=None):
65 self.name = env_var_name
66 self.default = default
67 self.true_values = true_values if true_values is not None else \
71 return os.getenv(self.name, self.default).lower() in self.true_values
73 if sys.version_info[0] == 2:
74 __nonzero__ = __bool__
77 return 'BoolEnvironmentVariable(%r, default=%r, true_values=%r)' % \
78 (self.name, self.default, self.true_values)
81 debug_framework = BoolEnvironmentVariable('TEST_DEBUG')
86 Test framework module.
88 The module provides a set of tools for constructing and running tests and
89 representing the results.
93 class VppDiedError(Exception):
94 """ exception for reporting that the subprocess has died."""
96 signals_by_value = {v: k for k, v in signal.__dict__.items() if
97 k.startswith('SIG') and not k.startswith('SIG_')}
99 def __init__(self, rv=None, testcase=None, method_name=None):
101 self.signal_name = None
102 self.testcase = testcase
103 self.method_name = method_name
106 self.signal_name = VppDiedError.signals_by_value[-rv]
107 except (KeyError, TypeError):
110 if testcase is None and method_name is None:
113 in_msg = 'running %s.%s ' % (testcase, method_name)
115 msg = "VPP subprocess died %sunexpectedly with return code: %d%s." % (
118 ' [%s]' % (self.signal_name if
119 self.signal_name is not None else ''))
120 super(VppDiedError, self).__init__(msg)
123 class _PacketInfo(object):
124 """Private class to create packet info object.
126 Help process information about the next packet.
127 Set variables to default values.
129 #: Store the index of the packet.
131 #: Store the index of the source packet generator interface of the packet.
133 #: Store the index of the destination packet generator interface
136 #: Store expected ip version
138 #: Store expected upper protocol
140 #: Store the copy of the former packet.
143 def __eq__(self, other):
144 index = self.index == other.index
145 src = self.src == other.src
146 dst = self.dst == other.dst
147 data = self.data == other.data
148 return index and src and dst and data
151 def pump_output(testclass):
152 """ pump output from vpp stdout/stderr to proper queues """
155 while not testclass.pump_thread_stop_flag.is_set():
156 readable = select.select([testclass.vpp.stdout.fileno(),
157 testclass.vpp.stderr.fileno(),
158 testclass.pump_thread_wakeup_pipe[0]],
160 if testclass.vpp.stdout.fileno() in readable:
161 read = os.read(testclass.vpp.stdout.fileno(), 102400)
163 split = read.decode('ascii',
164 errors='backslashreplace').splitlines(True)
165 if len(stdout_fragment) > 0:
166 split[0] = "%s%s" % (stdout_fragment, split[0])
167 if len(split) > 0 and split[-1].endswith("\n"):
171 stdout_fragment = split[-1]
172 testclass.vpp_stdout_deque.extend(split[:limit])
173 if not testclass.cache_vpp_output:
174 for line in split[:limit]:
175 testclass.logger.info(
176 "VPP STDOUT: %s" % line.rstrip("\n"))
177 if testclass.vpp.stderr.fileno() in readable:
178 read = os.read(testclass.vpp.stderr.fileno(), 102400)
180 split = read.decode('ascii',
181 errors='backslashreplace').splitlines(True)
182 if len(stderr_fragment) > 0:
183 split[0] = "%s%s" % (stderr_fragment, split[0])
184 if len(split) > 0 and split[-1].endswith("\n"):
188 stderr_fragment = split[-1]
190 testclass.vpp_stderr_deque.extend(split[:limit])
191 if not testclass.cache_vpp_output:
192 for line in split[:limit]:
193 testclass.logger.error(
194 "VPP STDERR: %s" % line.rstrip("\n"))
195 # ignoring the dummy pipe here intentionally - the
196 # flag will take care of properly terminating the loop
199 def _is_skip_aarch64_set():
200 return BoolEnvironmentVariable('SKIP_AARCH64')
203 is_skip_aarch64_set = _is_skip_aarch64_set()
206 def _is_platform_aarch64():
207 return platform.machine() == 'aarch64'
210 is_platform_aarch64 = _is_platform_aarch64()
213 def _running_extended_tests():
214 return BoolEnvironmentVariable("EXTENDED_TESTS")
217 running_extended_tests = _running_extended_tests()
220 def _running_on_centos():
221 os_id = os.getenv("OS_ID", "")
222 return True if "centos" in os_id.lower() else False
225 running_on_centos = _running_on_centos()
228 class KeepAliveReporter(object):
230 Singleton object which reports test start to parent process
235 self.__dict__ = self._shared_state
243 def pipe(self, pipe):
244 if self._pipe is not None:
245 raise Exception("Internal error - pipe should only be set once.")
248 def send_keep_alive(self, test, desc=None):
250 Write current test tmpdir & desc to keep-alive pipe to signal liveness
252 if self.pipe is None:
253 # if not running forked..
257 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
261 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
264 class VppTestCase(unittest.TestCase):
265 """This subclass is a base class for VPP test cases that are implemented as
266 classes. It provides methods to create and run test case.
269 extra_vpp_punt_config = []
270 extra_vpp_plugin_config = []
271 vapi_response_timeout = 5
274 def packet_infos(self):
275 """List of packet infos"""
276 return self._packet_infos
279 def get_packet_count_for_if_idx(cls, dst_if_index):
280 """Get the number of packet info for specified destination if index"""
281 if dst_if_index in cls._packet_count_for_dst_if_idx:
282 return cls._packet_count_for_dst_if_idx[dst_if_index]
288 """Return the instance of this testcase"""
289 return cls.test_instance
292 def set_debug_flags(cls, d):
293 cls.gdbserver_port = 7777
294 cls.debug_core = False
295 cls.debug_gdb = False
296 cls.debug_gdbserver = False
297 cls.debug_all = False
302 cls.debug_core = True
303 elif dl == "gdb" or dl == "gdb-all":
305 elif dl == "gdbserver" or dl == "gdbserver-all":
306 cls.debug_gdbserver = True
308 raise Exception("Unrecognized DEBUG option: '%s'" % d)
309 if dl == "gdb-all" or dl == "gdbserver-all":
313 def get_least_used_cpu():
314 cpu_usage_list = [set(range(psutil.cpu_count()))]
315 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
316 if 'vpp_main' == p.info['name']]
317 for vpp_process in vpp_processes:
318 for cpu_usage_set in cpu_usage_list:
320 cpu_num = vpp_process.cpu_num()
321 if cpu_num in cpu_usage_set:
322 cpu_usage_set_index = cpu_usage_list.index(
324 if cpu_usage_set_index == len(cpu_usage_list) - 1:
325 cpu_usage_list.append({cpu_num})
327 cpu_usage_list[cpu_usage_set_index + 1].add(
329 cpu_usage_set.remove(cpu_num)
331 except psutil.NoSuchProcess:
334 for cpu_usage_set in cpu_usage_list:
335 if len(cpu_usage_set) > 0:
336 min_usage_set = cpu_usage_set
339 return random.choice(tuple(min_usage_set))
342 def setUpConstants(cls):
343 """ Set-up the test case class based on environment variables """
344 cls.step = BoolEnvironmentVariable('STEP')
345 d = os.getenv("DEBUG", None)
346 # inverted case to handle '' == True
347 c = os.getenv("CACHE_OUTPUT", "1")
348 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
349 cls.set_debug_flags(d)
350 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
351 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
352 cls.test_plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
353 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
355 if cls.plugin_path is not None:
356 if cls.extern_plugin_path is not None:
357 plugin_path = "%s:%s" % (
358 cls.plugin_path, cls.extern_plugin_path)
360 plugin_path = cls.plugin_path
361 elif cls.extern_plugin_path is not None:
362 plugin_path = cls.extern_plugin_path
364 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
365 debug_cli = "cli-listen localhost:5002"
367 size = os.getenv("COREDUMP_SIZE")
369 coredump_size = "coredump-size %s" % size
370 if coredump_size is None:
371 coredump_size = "coredump-size unlimited"
373 cpu_core_number = cls.get_least_used_cpu()
374 if not hasattr(cls, "worker_config"):
375 cls.worker_config = ""
377 cls.vpp_cmdline = [cls.vpp_bin, "unix",
378 "{", "nodaemon", debug_cli, "full-coredump",
379 coredump_size, "runtime-dir", cls.tempdir, "}",
380 "api-trace", "{", "on", "}", "api-segment", "{",
381 "prefix", cls.shm_prefix, "}", "cpu", "{",
382 "main-core", str(cpu_core_number),
383 cls.worker_config, "}",
384 "statseg", "{", "socket-name", cls.stats_sock, "}",
385 "socksvr", "{", "socket-name", cls.api_sock, "}",
387 "{", "plugin", "dpdk_plugin.so", "{", "disable",
388 "}", "plugin", "rdma_plugin.so", "{", "disable",
389 "}", "plugin", "unittest_plugin.so", "{", "enable",
390 "}"] + cls.extra_vpp_plugin_config + ["}", ]
391 if cls.extra_vpp_punt_config is not None:
392 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
393 if plugin_path is not None:
394 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
395 if cls.test_plugin_path is not None:
396 cls.vpp_cmdline.extend(["test_plugin_path", cls.test_plugin_path])
398 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
399 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
402 def wait_for_enter(cls):
403 if cls.debug_gdbserver:
404 print(double_line_delim)
405 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
407 print(double_line_delim)
408 print("Spawned VPP with PID: %d" % cls.vpp.pid)
410 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
412 print(single_line_delim)
413 print("You can debug VPP using:")
414 if cls.debug_gdbserver:
415 print("sudo gdb " + cls.vpp_bin +
416 " -ex 'target remote localhost:{port}'"
417 .format(port=cls.gdbserver_port))
418 print("Now is the time to attach gdb by running the above "
419 "command, set up breakpoints etc., then resume VPP from "
420 "within gdb by issuing the 'continue' command")
421 cls.gdbserver_port += 1
423 print("sudo gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
424 print("Now is the time to attach gdb by running the above "
425 "command and set up breakpoints etc., then resume VPP from"
426 " within gdb by issuing the 'continue' command")
427 print(single_line_delim)
428 input("Press ENTER to continue running the testcase...")
432 cmdline = cls.vpp_cmdline
434 if cls.debug_gdbserver:
435 gdbserver = '/usr/bin/gdbserver'
436 if not os.path.isfile(gdbserver) or \
437 not os.access(gdbserver, os.X_OK):
438 raise Exception("gdbserver binary '%s' does not exist or is "
439 "not executable" % gdbserver)
441 cmdline = [gdbserver, 'localhost:{port}'
442 .format(port=cls.gdbserver_port)] + cls.vpp_cmdline
443 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
446 cls.vpp = subprocess.Popen(cmdline,
447 stdout=subprocess.PIPE,
448 stderr=subprocess.PIPE,
450 except subprocess.CalledProcessError as e:
451 cls.logger.critical("Subprocess returned with non-0 return code: ("
455 cls.logger.critical("Subprocess returned with OS error: "
456 "(%s) %s", e.errno, e.strerror)
458 except Exception as e:
459 cls.logger.exception("Subprocess returned unexpected from "
466 def wait_for_coredump(cls):
467 corefile = cls.tempdir + "/core"
468 if os.path.isfile(corefile):
469 cls.logger.error("Waiting for coredump to complete: %s", corefile)
470 curr_size = os.path.getsize(corefile)
471 deadline = time.time() + 60
473 while time.time() < deadline:
476 curr_size = os.path.getsize(corefile)
477 if size == curr_size:
481 cls.logger.error("Timed out waiting for coredump to complete:"
484 cls.logger.error("Coredump complete: %s, size %d",
490 Perform class setup before running the testcase
491 Remove shared memory files, start vpp and connect the vpp-api
493 super(VppTestCase, cls).setUpClass()
494 gc.collect() # run garbage collection first
495 cls.logger = get_logger(cls.__name__)
496 seed = os.environ["RND_SEED"]
498 if hasattr(cls, 'parallel_handler'):
499 cls.logger.addHandler(cls.parallel_handler)
500 cls.logger.propagate = False
502 cls.tempdir = tempfile.mkdtemp(
503 prefix='vpp-unittest-%s-' % cls.__name__)
504 cls.stats_sock = "%s/stats.sock" % cls.tempdir
505 cls.api_sock = "%s/api.sock" % cls.tempdir
506 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
507 cls.file_handler.setFormatter(
508 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
510 cls.file_handler.setLevel(DEBUG)
511 cls.logger.addHandler(cls.file_handler)
512 cls.logger.debug("--- setUpClass() for %s called ---" %
514 cls.shm_prefix = os.path.basename(cls.tempdir)
515 os.chdir(cls.tempdir)
516 cls.logger.info("Temporary dir is %s, shm prefix is %s",
517 cls.tempdir, cls.shm_prefix)
518 cls.logger.debug("Random seed is %s" % seed)
520 cls.reset_packet_infos()
524 cls.registry = VppObjectRegistry()
525 cls.vpp_startup_failed = False
526 cls.reporter = KeepAliveReporter()
527 # need to catch exceptions here because if we raise, then the cleanup
528 # doesn't get called and we might end with a zombie vpp
531 cls.reporter.send_keep_alive(cls, 'setUpClass')
532 VppTestResult.current_test_case_info = TestCaseInfo(
533 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
534 cls.vpp_stdout_deque = deque()
535 cls.vpp_stderr_deque = deque()
536 cls.pump_thread_stop_flag = Event()
537 cls.pump_thread_wakeup_pipe = os.pipe()
538 cls.pump_thread = Thread(target=pump_output, args=(cls,))
539 cls.pump_thread.daemon = True
540 cls.pump_thread.start()
541 if cls.debug_gdb or cls.debug_gdbserver:
542 cls.vapi_response_timeout = 0
543 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
544 cls.vapi_response_timeout)
546 hook = hookmodule.StepHook(cls)
548 hook = hookmodule.PollHook(cls)
549 cls.vapi.register_hook(hook)
550 cls.statistics = VPPStats(socketname=cls.stats_sock)
554 cls.vpp_startup_failed = True
556 "VPP died shortly after startup, check the"
557 " output to standard error for possible cause")
561 except vpp_papi.VPPIOError as e:
562 cls.logger.debug("Exception connecting to vapi: %s" % e)
563 cls.vapi.disconnect()
565 if cls.debug_gdbserver:
566 print(colorize("You're running VPP inside gdbserver but "
567 "VPP-API connection failed, did you forget "
568 "to 'continue' VPP from within gdb?", RED))
570 except Exception as e:
571 cls.logger.debug("Exception connecting to VPP: %s" % e)
579 Disconnect vpp-api, kill vpp and cleanup shared memory files
581 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
583 if cls.vpp.returncode is None:
585 print(double_line_delim)
586 print("VPP or GDB server is still running")
587 print(single_line_delim)
588 input("When done debugging, press ENTER to kill the "
589 "process and finish running the testcase...")
591 # first signal that we want to stop the pump thread, then wake it up
592 if hasattr(cls, 'pump_thread_stop_flag'):
593 cls.pump_thread_stop_flag.set()
594 if hasattr(cls, 'pump_thread_wakeup_pipe'):
595 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
596 if hasattr(cls, 'pump_thread'):
597 cls.logger.debug("Waiting for pump thread to stop")
598 cls.pump_thread.join()
599 if hasattr(cls, 'vpp_stderr_reader_thread'):
600 cls.logger.debug("Waiting for stdderr pump to stop")
601 cls.vpp_stderr_reader_thread.join()
603 if hasattr(cls, 'vpp'):
604 if hasattr(cls, 'vapi'):
605 cls.logger.debug(cls.vapi.vpp.get_stats())
606 cls.logger.debug("Disconnecting class vapi client on %s",
608 cls.vapi.disconnect()
609 cls.logger.debug("Deleting class vapi attribute on %s",
613 if cls.vpp.returncode is None:
614 cls.wait_for_coredump()
615 cls.logger.debug("Sending TERM to vpp")
617 cls.logger.debug("Waiting for vpp to die")
618 cls.vpp.communicate()
619 cls.logger.debug("Deleting class vpp attribute on %s",
623 if cls.vpp_startup_failed:
624 stdout_log = cls.logger.info
625 stderr_log = cls.logger.critical
627 stdout_log = cls.logger.info
628 stderr_log = cls.logger.info
630 if hasattr(cls, 'vpp_stdout_deque'):
631 stdout_log(single_line_delim)
632 stdout_log('VPP output to stdout while running %s:', cls.__name__)
633 stdout_log(single_line_delim)
634 vpp_output = "".join(cls.vpp_stdout_deque)
635 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
637 stdout_log('\n%s', vpp_output)
638 stdout_log(single_line_delim)
640 if hasattr(cls, 'vpp_stderr_deque'):
641 stderr_log(single_line_delim)
642 stderr_log('VPP output to stderr while running %s:', cls.__name__)
643 stderr_log(single_line_delim)
644 vpp_output = "".join(cls.vpp_stderr_deque)
645 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
647 stderr_log('\n%s', vpp_output)
648 stderr_log(single_line_delim)
651 def tearDownClass(cls):
652 """ Perform final cleanup after running all tests in this test-case """
653 cls.logger.debug("--- tearDownClass() for %s called ---" %
655 cls.reporter.send_keep_alive(cls, 'tearDownClass')
657 cls.file_handler.close()
658 cls.reset_packet_infos()
660 debug_internal.on_tear_down_class(cls)
662 def show_commands_at_teardown(self):
663 """ Allow subclass specific teardown logging additions."""
664 self.logger.info("--- No test specific show commands provided. ---")
667 """ Show various debug prints after each test """
668 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
669 (self.__class__.__name__, self._testMethodName,
670 self._testMethodDoc))
673 if not self.vpp_dead:
674 self.logger.debug(self.vapi.cli("show trace max 1000"))
675 self.logger.info(self.vapi.ppcli("show interface"))
676 self.logger.info(self.vapi.ppcli("show hardware"))
677 self.logger.info(self.statistics.set_errors_str())
678 self.logger.info(self.vapi.ppcli("show run"))
679 self.logger.info(self.vapi.ppcli("show log"))
680 self.logger.info(self.vapi.ppcli("show bihash"))
681 self.logger.info("Logging testcase specific show commands.")
682 self.show_commands_at_teardown()
683 self.registry.remove_vpp_config(self.logger)
684 # Save/Dump VPP api trace log
685 m = self._testMethodName
686 api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
687 tmp_api_trace = "/tmp/%s" % api_trace
688 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
689 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
690 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
692 os.rename(tmp_api_trace, vpp_api_trace_log)
693 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
695 except VppTransportShmemIOError:
696 self.logger.debug("VppTransportShmemIOError: Vpp dead. "
697 "Cannot log show commands.")
700 self.registry.unregister_all(self.logger)
703 """ Clear trace before running each test"""
704 super(VppTestCase, self).setUp()
705 self.reporter.send_keep_alive(self)
708 raise VppDiedError(rv=None, testcase=self.__class__.__name__,
709 method_name=self._testMethodName)
710 self.sleep(.1, "during setUp")
711 self.vpp_stdout_deque.append(
712 "--- test setUp() for %s.%s(%s) starts here ---\n" %
713 (self.__class__.__name__, self._testMethodName,
714 self._testMethodDoc))
715 self.vpp_stderr_deque.append(
716 "--- test setUp() for %s.%s(%s) starts here ---\n" %
717 (self.__class__.__name__, self._testMethodName,
718 self._testMethodDoc))
719 self.vapi.cli("clear trace")
720 # store the test instance inside the test class - so that objects
721 # holding the class can access instance methods (like assertEqual)
722 type(self).test_instance = self
725 def pg_enable_capture(cls, interfaces=None):
727 Enable capture on packet-generator interfaces
729 :param interfaces: iterable interface indexes (if None,
730 use self.pg_interfaces)
733 if interfaces is None:
734 interfaces = cls.pg_interfaces
739 def register_capture(cls, cap_name):
740 """ Register a capture in the testclass """
741 # add to the list of captures with current timestamp
742 cls._captures.append((time.time(), cap_name))
745 def get_vpp_time(cls):
746 return float(cls.vapi.cli('show clock').replace("Time now ", ""))
749 def sleep_on_vpp_time(cls, sec):
750 """ Sleep according to time in VPP world """
751 # On a busy system with many processes
752 # we might end up with VPP time being slower than real world
753 # So take that into account when waiting for VPP to do something
754 start_time = cls.get_vpp_time()
755 while cls.get_vpp_time() - start_time < sec:
760 """ Enable the PG, wait till it is done, then clean up """
761 cls.vapi.cli("trace add pg-input 1000")
762 cls.vapi.cli('packet-generator enable')
763 # PG, when starts, runs to completion -
764 # so let's avoid a race condition,
765 # and wait a little till it's done.
766 # Then clean it up - and then be gone.
767 deadline = time.time() + 300
768 while cls.vapi.cli('show packet-generator').find("Yes") != -1:
769 cls.sleep(0.01) # yield
770 if time.time() > deadline:
771 cls.logger.error("Timeout waiting for pg to stop")
773 for stamp, cap_name in cls._captures:
774 cls.vapi.cli('packet-generator delete %s' % cap_name)
778 def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
780 Create packet-generator interfaces.
782 :param interfaces: iterable indexes of the interfaces.
783 :returns: List of created interfaces.
788 intf = VppPGInterface(cls, i, gso, gso_size)
789 setattr(cls, intf.name, intf)
791 cls.pg_interfaces = result
795 def create_loopback_interfaces(cls, count):
797 Create loopback interfaces.
799 :param count: number of interfaces created.
800 :returns: List of created interfaces.
802 result = [VppLoInterface(cls) for i in range(count)]
804 setattr(cls, intf.name, intf)
805 cls.lo_interfaces = result
809 def create_bvi_interfaces(cls, count):
811 Create BVI interfaces.
813 :param count: number of interfaces created.
814 :returns: List of created interfaces.
816 result = [VppBviInterface(cls) for i in range(count)]
818 setattr(cls, intf.name, intf)
819 cls.bvi_interfaces = result
823 def extend_packet(packet, size, padding=' '):
825 Extend packet to given size by padding with spaces or custom padding
826 NOTE: Currently works only when Raw layer is present.
828 :param packet: packet
829 :param size: target size
830 :param padding: padding used to extend the payload
833 packet_len = len(packet) + 4
834 extend = size - packet_len
836 num = (extend // len(padding)) + 1
837 packet[Raw].load += (padding * num)[:extend].encode("ascii")
840 def reset_packet_infos(cls):
841 """ Reset the list of packet info objects and packet counts to zero """
842 cls._packet_infos = {}
843 cls._packet_count_for_dst_if_idx = {}
846 def create_packet_info(cls, src_if, dst_if):
848 Create packet info object containing the source and destination indexes
849 and add it to the testcase's packet info list
851 :param VppInterface src_if: source interface
852 :param VppInterface dst_if: destination interface
854 :returns: _PacketInfo object
858 info.index = len(cls._packet_infos)
859 info.src = src_if.sw_if_index
860 info.dst = dst_if.sw_if_index
861 if isinstance(dst_if, VppSubInterface):
862 dst_idx = dst_if.parent.sw_if_index
864 dst_idx = dst_if.sw_if_index
865 if dst_idx in cls._packet_count_for_dst_if_idx:
866 cls._packet_count_for_dst_if_idx[dst_idx] += 1
868 cls._packet_count_for_dst_if_idx[dst_idx] = 1
869 cls._packet_infos[info.index] = info
873 def info_to_payload(info):
875 Convert _PacketInfo object to packet payload
877 :param info: _PacketInfo object
879 :returns: string containing serialized data from packet info
881 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
885 def payload_to_info(payload, payload_field='load'):
887 Convert packet payload to _PacketInfo object
889 :param payload: packet payload
890 :type payload: <class 'scapy.packet.Raw'>
891 :param payload_field: packet fieldname of payload "load" for
892 <class 'scapy.packet.Raw'>
893 :type payload_field: str
894 :returns: _PacketInfo object containing de-serialized data from payload
897 numbers = getattr(payload, payload_field).split()
899 info.index = int(numbers[0])
900 info.src = int(numbers[1])
901 info.dst = int(numbers[2])
902 info.ip = int(numbers[3])
903 info.proto = int(numbers[4])
906 def get_next_packet_info(self, info):
908 Iterate over the packet info list stored in the testcase
909 Start iteration with first element if info is None
910 Continue based on index in info if info is specified
912 :param info: info or None
913 :returns: next info in list or None if no more infos
918 next_index = info.index + 1
919 if next_index == len(self._packet_infos):
922 return self._packet_infos[next_index]
924 def get_next_packet_info_for_interface(self, src_index, info):
926 Search the packet info list for the next packet info with same source
929 :param src_index: source interface index to search for
930 :param info: packet info - where to start the search
931 :returns: packet info or None
935 info = self.get_next_packet_info(info)
938 if info.src == src_index:
941 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
943 Search the packet info list for the next packet info with same source
944 and destination interface indexes
946 :param src_index: source interface index to search for
947 :param dst_index: destination interface index to search for
948 :param info: packet info - where to start the search
949 :returns: packet info or None
953 info = self.get_next_packet_info_for_interface(src_index, info)
956 if info.dst == dst_index:
959 def assert_equal(self, real_value, expected_value, name_or_class=None):
960 if name_or_class is None:
961 self.assertEqual(real_value, expected_value)
964 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
965 msg = msg % (getdoc(name_or_class).strip(),
966 real_value, str(name_or_class(real_value)),
967 expected_value, str(name_or_class(expected_value)))
969 msg = "Invalid %s: %s does not match expected value %s" % (
970 name_or_class, real_value, expected_value)
972 self.assertEqual(real_value, expected_value, msg)
974 def assert_in_range(self,
982 msg = "Invalid %s: %s out of range <%s,%s>" % (
983 name, real_value, expected_min, expected_max)
984 self.assertTrue(expected_min <= real_value <= expected_max, msg)
986 def assert_packet_checksums_valid(self, packet,
987 ignore_zero_udp_checksums=True):
988 received = packet.__class__(scapy.compat.raw(packet))
989 udp_layers = ['UDP', 'UDPerror']
990 checksum_fields = ['cksum', 'chksum']
993 temp = received.__class__(scapy.compat.raw(received))
995 layer = temp.getlayer(counter)
998 layer.remove_payload()
999 for cf in checksum_fields:
1000 if hasattr(layer, cf):
1001 if ignore_zero_udp_checksums and \
1002 0 == getattr(layer, cf) and \
1003 layer.name in udp_layers:
1005 delattr(temp.getlayer(counter), cf)
1006 checksums.append((counter, cf))
1009 counter = counter + 1
1010 if 0 == len(checksums):
1012 temp = temp.__class__(scapy.compat.raw(temp))
1013 for layer, cf in checksums:
1014 calc_sum = getattr(temp[layer], cf)
1016 getattr(received[layer], cf), calc_sum,
1017 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
1019 "Checksum field `%s` on `%s` layer has correct value `%s`" %
1020 (cf, temp[layer].name, calc_sum))
1022 def assert_checksum_valid(self, received_packet, layer,
1023 field_name='chksum',
1024 ignore_zero_checksum=False):
1025 """ Check checksum of received packet on given layer """
1026 received_packet_checksum = getattr(received_packet[layer], field_name)
1027 if ignore_zero_checksum and 0 == received_packet_checksum:
1029 recalculated = received_packet.__class__(
1030 scapy.compat.raw(received_packet))
1031 delattr(recalculated[layer], field_name)
1032 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1033 self.assert_equal(received_packet_checksum,
1034 getattr(recalculated[layer], field_name),
1035 "packet checksum on layer: %s" % layer)
1037 def assert_ip_checksum_valid(self, received_packet,
1038 ignore_zero_checksum=False):
1039 self.assert_checksum_valid(received_packet, 'IP',
1040 ignore_zero_checksum=ignore_zero_checksum)
1042 def assert_tcp_checksum_valid(self, received_packet,
1043 ignore_zero_checksum=False):
1044 self.assert_checksum_valid(received_packet, 'TCP',
1045 ignore_zero_checksum=ignore_zero_checksum)
1047 def assert_udp_checksum_valid(self, received_packet,
1048 ignore_zero_checksum=True):
1049 self.assert_checksum_valid(received_packet, 'UDP',
1050 ignore_zero_checksum=ignore_zero_checksum)
1052 def assert_embedded_icmp_checksum_valid(self, received_packet):
1053 if received_packet.haslayer(IPerror):
1054 self.assert_checksum_valid(received_packet, 'IPerror')
1055 if received_packet.haslayer(TCPerror):
1056 self.assert_checksum_valid(received_packet, 'TCPerror')
1057 if received_packet.haslayer(UDPerror):
1058 self.assert_checksum_valid(received_packet, 'UDPerror',
1059 ignore_zero_checksum=True)
1060 if received_packet.haslayer(ICMPerror):
1061 self.assert_checksum_valid(received_packet, 'ICMPerror')
1063 def assert_icmp_checksum_valid(self, received_packet):
1064 self.assert_checksum_valid(received_packet, 'ICMP')
1065 self.assert_embedded_icmp_checksum_valid(received_packet)
1067 def assert_icmpv6_checksum_valid(self, pkt):
1068 if pkt.haslayer(ICMPv6DestUnreach):
1069 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
1070 self.assert_embedded_icmp_checksum_valid(pkt)
1071 if pkt.haslayer(ICMPv6EchoRequest):
1072 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
1073 if pkt.haslayer(ICMPv6EchoReply):
1074 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
1076 def get_packet_counter(self, counter):
1077 if counter.startswith("/"):
1078 counter_value = self.statistics.get_counter(counter)
1080 counters = self.vapi.cli("sh errors").split('\n')
1082 for i in range(1, len(counters) - 1):
1083 results = counters[i].split()
1084 if results[1] == counter:
1085 counter_value = int(results[0])
1087 return counter_value
1089 def assert_packet_counter_equal(self, counter, expected_value):
1090 counter_value = self.get_packet_counter(counter)
1091 self.assert_equal(counter_value, expected_value,
1092 "packet counter `%s'" % counter)
1094 def assert_error_counter_equal(self, counter, expected_value):
1095 counter_value = self.statistics.get_err_counter(counter)
1096 self.assert_equal(counter_value, expected_value,
1097 "error counter `%s'" % counter)
1100 def sleep(cls, timeout, remark=None):
1102 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1103 # * by Guido, only the main thread can be interrupted.
1105 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1108 if hasattr(os, 'sched_yield'):
1114 if hasattr(cls, 'logger'):
1115 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1116 before = time.time()
1119 if hasattr(cls, 'logger') and after - before > 2 * timeout:
1120 cls.logger.error("unexpected self.sleep() result - "
1121 "slept for %es instead of ~%es!",
1122 after - before, timeout)
1123 if hasattr(cls, 'logger'):
1125 "Finished sleep (%s) - slept %es (wanted %es)",
1126 remark, after - before, timeout)
1128 def pg_send(self, intf, pkts):
1129 self.vapi.cli("clear trace")
1130 intf.add_stream(pkts)
1131 self.pg_enable_capture(self.pg_interfaces)
1134 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
1135 self.pg_send(intf, pkts)
1138 for i in self.pg_interfaces:
1139 i.get_capture(0, timeout=timeout)
1140 i.assert_nothing_captured(remark=remark)
1143 def send_and_expect(self, intf, pkts, output, n_rx=None):
1146 self.pg_send(intf, pkts)
1147 rx = output.get_capture(n_rx)
1150 def send_and_expect_only(self, intf, pkts, output, timeout=None):
1151 self.pg_send(intf, pkts)
1152 rx = output.get_capture(len(pkts))
1156 for i in self.pg_interfaces:
1157 if i not in outputs:
1158 i.get_capture(0, timeout=timeout)
1159 i.assert_nothing_captured()
1165 def get_testcase_doc_name(test):
1166 return getdoc(test.__class__).splitlines()[0]
1169 def get_test_description(descriptions, test):
1170 short_description = test.shortDescription()
1171 if descriptions and short_description:
1172 return short_description
1177 class TestCaseInfo(object):
1178 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1179 self.logger = logger
1180 self.tempdir = tempdir
1181 self.vpp_pid = vpp_pid
1182 self.vpp_bin_path = vpp_bin_path
1183 self.core_crash_test = None
1186 class VppTestResult(unittest.TestResult):
1188 @property result_string
1189 String variable to store the test case result string.
1191 List variable containing 2-tuples of TestCase instances and strings
1192 holding formatted tracebacks. Each tuple represents a test which
1193 raised an unexpected exception.
1195 List variable containing 2-tuples of TestCase instances and strings
1196 holding formatted tracebacks. Each tuple represents a test where
1197 a failure was explicitly signalled using the TestCase.assert*()
1201 failed_test_cases_info = set()
1202 core_crash_test_cases_info = set()
1203 current_test_case_info = None
1205 def __init__(self, stream=None, descriptions=None, verbosity=None,
1208 :param stream File descriptor to store where to report test results.
1209 Set to the standard error stream by default.
1210 :param descriptions Boolean variable to store information if to use
1211 test case descriptions.
1212 :param verbosity Integer variable to store required verbosity level.
1214 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1215 self.stream = stream
1216 self.descriptions = descriptions
1217 self.verbosity = verbosity
1218 self.result_string = None
1219 self.runner = runner
1221 def addSuccess(self, test):
1223 Record a test succeeded result
1228 if self.current_test_case_info:
1229 self.current_test_case_info.logger.debug(
1230 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1231 test._testMethodName,
1232 test._testMethodDoc))
1233 unittest.TestResult.addSuccess(self, test)
1234 self.result_string = colorize("OK", GREEN)
1236 self.send_result_through_pipe(test, PASS)
1238 def addSkip(self, test, reason):
1240 Record a test skipped.
1246 if self.current_test_case_info:
1247 self.current_test_case_info.logger.debug(
1248 "--- addSkip() %s.%s(%s) called, reason is %s" %
1249 (test.__class__.__name__, test._testMethodName,
1250 test._testMethodDoc, reason))
1251 unittest.TestResult.addSkip(self, test, reason)
1252 self.result_string = colorize("SKIP", YELLOW)
1254 self.send_result_through_pipe(test, SKIP)
1256 def symlink_failed(self):
1257 if self.current_test_case_info:
1259 failed_dir = os.getenv('FAILED_DIR')
1260 link_path = os.path.join(
1263 os.path.basename(self.current_test_case_info.tempdir))
1264 if self.current_test_case_info.logger:
1265 self.current_test_case_info.logger.debug(
1266 "creating a link to the failed test")
1267 self.current_test_case_info.logger.debug(
1268 "os.symlink(%s, %s)" %
1269 (self.current_test_case_info.tempdir, link_path))
1270 if os.path.exists(link_path):
1271 if self.current_test_case_info.logger:
1272 self.current_test_case_info.logger.debug(
1273 'symlink already exists')
1275 os.symlink(self.current_test_case_info.tempdir, link_path)
1277 except Exception as e:
1278 if self.current_test_case_info.logger:
1279 self.current_test_case_info.logger.error(e)
1281 def send_result_through_pipe(self, test, result):
1282 if hasattr(self, 'test_framework_result_pipe'):
1283 pipe = self.test_framework_result_pipe
1285 pipe.send((test.id(), result))
1287 def log_error(self, test, err, fn_name):
1288 if self.current_test_case_info:
1289 if isinstance(test, unittest.suite._ErrorHolder):
1290 test_name = test.description
1292 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1293 test._testMethodName,
1294 test._testMethodDoc)
1295 self.current_test_case_info.logger.debug(
1296 "--- %s() %s called, err is %s" %
1297 (fn_name, test_name, err))
1298 self.current_test_case_info.logger.debug(
1299 "formatted exception is:\n%s" %
1300 "".join(format_exception(*err)))
1302 def add_error(self, test, err, unittest_fn, error_type):
1303 if error_type == FAIL:
1304 self.log_error(test, err, 'addFailure')
1305 error_type_str = colorize("FAIL", RED)
1306 elif error_type == ERROR:
1307 self.log_error(test, err, 'addError')
1308 error_type_str = colorize("ERROR", RED)
1310 raise Exception('Error type %s cannot be used to record an '
1311 'error or a failure' % error_type)
1313 unittest_fn(self, test, err)
1314 if self.current_test_case_info:
1315 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1317 self.current_test_case_info.tempdir)
1318 self.symlink_failed()
1319 self.failed_test_cases_info.add(self.current_test_case_info)
1320 if is_core_present(self.current_test_case_info.tempdir):
1321 if not self.current_test_case_info.core_crash_test:
1322 if isinstance(test, unittest.suite._ErrorHolder):
1323 test_name = str(test)
1325 test_name = "'{!s}' ({!s})".format(
1326 get_testcase_doc_name(test), test.id())
1327 self.current_test_case_info.core_crash_test = test_name
1328 self.core_crash_test_cases_info.add(
1329 self.current_test_case_info)
1331 self.result_string = '%s [no temp dir]' % error_type_str
1333 self.send_result_through_pipe(test, error_type)
1335 def addFailure(self, test, err):
1337 Record a test failed result
1340 :param err: error message
1343 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1345 def addError(self, test, err):
1347 Record a test error result
1350 :param err: error message
1353 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1355 def getDescription(self, test):
1357 Get test description
1360 :returns: test description
1363 return get_test_description(self.descriptions, test)
1365 def startTest(self, test):
1373 def print_header(test):
1374 if not hasattr(test.__class__, '_header_printed'):
1375 print(double_line_delim)
1376 print(colorize(getdoc(test).splitlines()[0], GREEN))
1377 print(double_line_delim)
1378 test.__class__._header_printed = True
1381 self.start_test = time.time()
1382 unittest.TestResult.startTest(self, test)
1383 if self.verbosity > 0:
1384 self.stream.writeln(
1385 "Starting " + self.getDescription(test) + " ...")
1386 self.stream.writeln(single_line_delim)
1388 def stopTest(self, test):
1390 Called when the given test has been run
1395 unittest.TestResult.stopTest(self, test)
1397 if self.verbosity > 0:
1398 self.stream.writeln(single_line_delim)
1399 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1400 self.result_string))
1401 self.stream.writeln(single_line_delim)
1403 self.stream.writeln("%-68s %4.2f %s" %
1404 (self.getDescription(test),
1405 time.time() - self.start_test,
1406 self.result_string))
1408 self.send_result_through_pipe(test, TEST_RUN)
1410 def printErrors(self):
1412 Print errors from running the test case
1414 if len(self.errors) > 0 or len(self.failures) > 0:
1415 self.stream.writeln()
1416 self.printErrorList('ERROR', self.errors)
1417 self.printErrorList('FAIL', self.failures)
1419 # ^^ that is the last output from unittest before summary
1420 if not self.runner.print_summary:
1421 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1422 self.stream = devnull
1423 self.runner.stream = devnull
1425 def printErrorList(self, flavour, errors):
1427 Print error list to the output stream together with error type
1428 and test case description.
1430 :param flavour: error type
1431 :param errors: iterable errors
1434 for test, err in errors:
1435 self.stream.writeln(double_line_delim)
1436 self.stream.writeln("%s: %s" %
1437 (flavour, self.getDescription(test)))
1438 self.stream.writeln(single_line_delim)
1439 self.stream.writeln("%s" % err)
1442 class VppTestRunner(unittest.TextTestRunner):
1444 A basic test runner implementation which prints results to standard error.
1448 def resultclass(self):
1449 """Class maintaining the results of the tests"""
1450 return VppTestResult
1452 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1453 result_pipe=None, failfast=False, buffer=False,
1454 resultclass=None, print_summary=True, **kwargs):
1455 # ignore stream setting here, use hard-coded stdout to be in sync
1456 # with prints from VppTestCase methods ...
1457 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1458 verbosity, failfast, buffer,
1459 resultclass, **kwargs)
1460 KeepAliveReporter.pipe = keep_alive_pipe
1462 self.orig_stream = self.stream
1463 self.resultclass.test_framework_result_pipe = result_pipe
1465 self.print_summary = print_summary
1467 def _makeResult(self):
1468 return self.resultclass(self.stream,
1473 def run(self, test):
1480 faulthandler.enable() # emit stack trace to stderr if killed by signal
1482 result = super(VppTestRunner, self).run(test)
1483 if not self.print_summary:
1484 self.stream = self.orig_stream
1485 result.stream = self.orig_stream
1489 class Worker(Thread):
1490 def __init__(self, args, logger, env=None):
1491 self.logger = logger
1493 if hasattr(self, 'testcase') and self.testcase.debug_all:
1494 if self.testcase.debug_gdbserver:
1495 self.args = ['/usr/bin/gdbserver', 'localhost:{port}'
1496 .format(port=self.testcase.gdbserver_port)] + args
1497 elif self.testcase.debug_gdb and hasattr(self, 'wait_for_gdb'):
1498 self.args.append(self.wait_for_gdb)
1499 self.app_bin = args[0]
1500 self.app_name = os.path.basename(self.app_bin)
1501 if hasattr(self, 'role'):
1502 self.app_name += ' {role}'.format(role=self.role)
1505 env = {} if env is None else env
1506 self.env = copy.deepcopy(env)
1507 super(Worker, self).__init__()
1509 def wait_for_enter(self):
1510 if not hasattr(self, 'testcase'):
1512 if self.testcase.debug_all and self.testcase.debug_gdbserver:
1514 print(double_line_delim)
1515 print("Spawned GDB Server for '{app}' with PID: {pid}"
1516 .format(app=self.app_name, pid=self.process.pid))
1517 elif self.testcase.debug_all and self.testcase.debug_gdb:
1519 print(double_line_delim)
1520 print("Spawned '{app}' with PID: {pid}"
1521 .format(app=self.app_name, pid=self.process.pid))
1524 print(single_line_delim)
1525 print("You can debug '{app}' using:".format(app=self.app_name))
1526 if self.testcase.debug_gdbserver:
1527 print("sudo gdb " + self.app_bin +
1528 " -ex 'target remote localhost:{port}'"
1529 .format(port=self.testcase.gdbserver_port))
1530 print("Now is the time to attach gdb by running the above "
1531 "command, set up breakpoints etc., then resume from "
1532 "within gdb by issuing the 'continue' command")
1533 self.testcase.gdbserver_port += 1
1534 elif self.testcase.debug_gdb:
1535 print("sudo gdb " + self.app_bin +
1536 " -ex 'attach {pid}'".format(pid=self.process.pid))
1537 print("Now is the time to attach gdb by running the above "
1538 "command and set up breakpoints etc., then resume from"
1539 " within gdb by issuing the 'continue' command")
1540 print(single_line_delim)
1541 input("Press ENTER to continue running the testcase...")
1544 executable = self.args[0]
1545 if not os.path.exists(executable) or not os.access(
1546 executable, os.F_OK | os.X_OK):
1547 # Exit code that means some system file did not exist,
1548 # could not be opened, or had some other kind of error.
1549 self.result = os.EX_OSFILE
1550 raise EnvironmentError(
1551 "executable '%s' is not found or executable." % executable)
1552 self.logger.debug("Running executable: '{app}'"
1553 .format(app=' '.join(self.args)))
1554 env = os.environ.copy()
1555 env.update(self.env)
1556 env["CK_LOG_FILE_NAME"] = "-"
1557 self.process = subprocess.Popen(
1558 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1559 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1560 self.wait_for_enter()
1561 out, err = self.process.communicate()
1562 self.logger.debug("Finished running `{app}'".format(app=self.app_name))
1563 self.logger.info("Return code is `%s'" % self.process.returncode)
1564 self.logger.info(single_line_delim)
1565 self.logger.info("Executable `{app}' wrote to stdout:"
1566 .format(app=self.app_name))
1567 self.logger.info(single_line_delim)
1568 self.logger.info(out.decode('utf-8'))
1569 self.logger.info(single_line_delim)
1570 self.logger.info("Executable `{app}' wrote to stderr:"
1571 .format(app=self.app_name))
1572 self.logger.info(single_line_delim)
1573 self.logger.info(err.decode('utf-8'))
1574 self.logger.info(single_line_delim)
1575 self.result = self.process.returncode
1578 if __name__ == '__main__':