3 from __future__ import print_function
17 from collections import deque
18 from threading import Thread, Event
19 from inspect import getdoc, isclass
20 from traceback import format_exception
21 from logging import FileHandler, DEBUG, Formatter
24 from scapy.packet import Raw
25 import hook as hookmodule
26 from vpp_pg_interface import VppPGInterface
27 from vpp_sub_interface import VppSubInterface
28 from vpp_lo_interface import VppLoInterface
29 from vpp_bvi_interface import VppBviInterface
30 from vpp_papi_provider import VppPapiProvider
31 from vpp_papi.vpp_stats import VPPStats
32 from vpp_papi.vpp_transport_shmem import VppTransportShmemIOError
33 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
35 from vpp_object import VppObjectRegistry
36 from util import ppp, is_core_present
37 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
38 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
39 from scapy.layers.inet6 import ICMPv6EchoReply
41 if os.name == 'posix' and sys.version_info[0] < 3:
42 # using subprocess32 is recommended by python official documentation
43 # @ https://docs.python.org/2/library/subprocess.html
44 import subprocess32 as subprocess
48 # Python2/3 compatible
61 class BoolEnvironmentVariable(object):
63 def __init__(self, env_var_name, default='n', true_values=None):
64 self.name = env_var_name
65 self.default = default
66 self.true_values = true_values if true_values is not None else \
70 return os.getenv(self.name, self.default).lower() in self.true_values
72 if sys.version_info[0] == 2:
73 __nonzero__ = __bool__
76 return 'BoolEnvironmentVariable(%r, default=%r, true_values=%r)' % \
77 (self.name, self.default, self.true_values)
80 debug_framework = BoolEnvironmentVariable('TEST_DEBUG')
85 Test framework module.
87 The module provides a set of tools for constructing and running tests and
88 representing the results.
92 class VppDiedError(Exception):
93 """ exception for reporting that the subprocess has died."""
95 signals_by_value = {v: k for k, v in signal.__dict__.items() if
96 k.startswith('SIG') and not k.startswith('SIG_')}
98 def __init__(self, rv=None, testcase=None, method_name=None):
100 self.signal_name = None
101 self.testcase = testcase
102 self.method_name = method_name
105 self.signal_name = VppDiedError.signals_by_value[-rv]
106 except (KeyError, TypeError):
109 if testcase is None and method_name is None:
112 in_msg = 'running %s.%s ' % (testcase, method_name)
114 msg = "VPP subprocess died %sunexpectedly with return code: %d%s." % (
117 ' [%s]' % self.signal_name if
118 self.signal_name is not None else '')
119 super(VppDiedError, self).__init__(msg)
122 class _PacketInfo(object):
123 """Private class to create packet info object.
125 Help process information about the next packet.
126 Set variables to default values.
128 #: Store the index of the packet.
130 #: Store the index of the source packet generator interface of the packet.
132 #: Store the index of the destination packet generator interface
135 #: Store expected ip version
137 #: Store expected upper protocol
139 #: Store the copy of the former packet.
142 def __eq__(self, other):
143 index = self.index == other.index
144 src = self.src == other.src
145 dst = self.dst == other.dst
146 data = self.data == other.data
147 return index and src and dst and data
150 def pump_output(testclass):
151 """ pump output from vpp stdout/stderr to proper queues """
154 while not testclass.pump_thread_stop_flag.is_set():
155 readable = select.select([testclass.vpp.stdout.fileno(),
156 testclass.vpp.stderr.fileno(),
157 testclass.pump_thread_wakeup_pipe[0]],
159 if testclass.vpp.stdout.fileno() in readable:
160 read = os.read(testclass.vpp.stdout.fileno(), 102400)
162 split = read.splitlines(True)
163 if len(stdout_fragment) > 0:
164 split[0] = "%s%s" % (stdout_fragment, split[0])
165 if len(split) > 0 and split[-1].endswith("\n"):
169 stdout_fragment = split[-1]
170 testclass.vpp_stdout_deque.extend(split[:limit])
171 if not testclass.cache_vpp_output:
172 for line in split[:limit]:
173 testclass.logger.debug(
174 "VPP STDOUT: %s" % line.rstrip("\n"))
175 if testclass.vpp.stderr.fileno() in readable:
176 read = os.read(testclass.vpp.stderr.fileno(), 102400)
178 split = read.splitlines(True)
179 if len(stderr_fragment) > 0:
180 split[0] = "%s%s" % (stderr_fragment, split[0])
181 if len(split) > 0 and split[-1].endswith(b"\n"):
185 stderr_fragment = split[-1]
186 testclass.vpp_stderr_deque.extend(split[:limit])
187 if not testclass.cache_vpp_output:
188 for line in split[:limit]:
189 testclass.logger.debug(
190 "VPP STDERR: %s" % line.rstrip("\n"))
191 # ignoring the dummy pipe here intentionally - the
192 # flag will take care of properly terminating the loop
195 def _is_skip_aarch64_set():
196 return BoolEnvironmentVariable('SKIP_AARCH64')
199 is_skip_aarch64_set = _is_skip_aarch64_set()
202 def _is_platform_aarch64():
203 return platform.machine() == 'aarch64'
206 is_platform_aarch64 = _is_platform_aarch64()
209 def _running_extended_tests():
210 return BoolEnvironmentVariable("EXTENDED_TESTS")
213 running_extended_tests = _running_extended_tests()
216 def _running_on_centos():
217 os_id = os.getenv("OS_ID", "")
218 return True if "centos" in os_id.lower() else False
221 running_on_centos = _running_on_centos
224 class KeepAliveReporter(object):
226 Singleton object which reports test start to parent process
231 self.__dict__ = self._shared_state
239 def pipe(self, pipe):
240 if self._pipe is not None:
241 raise Exception("Internal error - pipe should only be set once.")
244 def send_keep_alive(self, test, desc=None):
246 Write current test tmpdir & desc to keep-alive pipe to signal liveness
248 if self.pipe is None:
249 # if not running forked..
253 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
257 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
260 class VppTestCase(unittest.TestCase):
261 """This subclass is a base class for VPP test cases that are implemented as
262 classes. It provides methods to create and run test case.
264 get_truthy_envar = staticmethod(BoolEnvironmentVariable)
266 extra_vpp_punt_config = []
267 extra_vpp_plugin_config = []
270 def packet_infos(self):
271 """List of packet infos"""
272 return self._packet_infos
275 def get_packet_count_for_if_idx(cls, dst_if_index):
276 """Get the number of packet info for specified destination if index"""
277 if dst_if_index in cls._packet_count_for_dst_if_idx:
278 return cls._packet_count_for_dst_if_idx[dst_if_index]
284 """Return the instance of this testcase"""
285 return cls.test_instance
288 def set_debug_flags(cls, d):
289 cls.debug_core = False
290 cls.debug_gdb = False
291 cls.debug_gdbserver = False
296 cls.debug_core = True
299 elif dl == "gdbserver":
300 cls.debug_gdbserver = True
302 raise Exception("Unrecognized DEBUG option: '%s'" % d)
305 def get_least_used_cpu():
306 cpu_usage_list = [set(range(psutil.cpu_count()))]
307 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
308 if 'vpp_main' == p.info['name']]
309 for vpp_process in vpp_processes:
310 for cpu_usage_set in cpu_usage_list:
312 cpu_num = vpp_process.cpu_num()
313 if cpu_num in cpu_usage_set:
314 cpu_usage_set_index = cpu_usage_list.index(
316 if cpu_usage_set_index == len(cpu_usage_list) - 1:
317 cpu_usage_list.append({cpu_num})
319 cpu_usage_list[cpu_usage_set_index + 1].add(
321 cpu_usage_set.remove(cpu_num)
323 except psutil.NoSuchProcess:
326 for cpu_usage_set in cpu_usage_list:
327 if len(cpu_usage_set) > 0:
328 min_usage_set = cpu_usage_set
331 return random.choice(tuple(min_usage_set))
334 def setUpConstants(cls):
335 """ Set-up the test case class based on environment variables """
336 cls.step = BoolEnvironmentVariable('STEP')
337 d = os.getenv("DEBUG", None)
338 # inverted case to handle '' == True
339 c = os.getenv("CACHE_OUTPUT", "1")
340 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
341 cls.set_debug_flags(d)
342 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
343 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
344 cls.test_plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
345 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
347 if cls.plugin_path is not None:
348 if cls.extern_plugin_path is not None:
349 plugin_path = "%s:%s" % (
350 cls.plugin_path, cls.extern_plugin_path)
352 plugin_path = cls.plugin_path
353 elif cls.extern_plugin_path is not None:
354 plugin_path = cls.extern_plugin_path
356 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
357 debug_cli = "cli-listen localhost:5002"
359 size = os.getenv("COREDUMP_SIZE")
361 coredump_size = "coredump-size %s" % size
362 if coredump_size is None:
363 coredump_size = "coredump-size unlimited"
365 cpu_core_number = cls.get_least_used_cpu()
367 cls.vpp_cmdline = [cls.vpp_bin, "unix",
368 "{", "nodaemon", debug_cli, "full-coredump",
369 coredump_size, "runtime-dir", cls.tempdir, "}",
370 "api-trace", "{", "on", "}", "api-segment", "{",
371 "prefix", cls.shm_prefix, "}", "cpu", "{",
372 "main-core", str(cpu_core_number), "}",
373 "statseg", "{", "socket-name", cls.stats_sock, "}",
374 "socksvr", "{", "socket-name", cls.api_sock, "}",
376 "{", "plugin", "dpdk_plugin.so", "{", "disable",
377 "}", "plugin", "rdma_plugin.so", "{", "disable",
378 "}", "plugin", "unittest_plugin.so", "{", "enable",
379 "}"] + cls.extra_vpp_plugin_config + ["}", ]
380 if cls.extra_vpp_punt_config is not None:
381 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
382 if plugin_path is not None:
383 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
384 if cls.test_plugin_path is not None:
385 cls.vpp_cmdline.extend(["test_plugin_path", cls.test_plugin_path])
387 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
388 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
391 def wait_for_enter(cls):
392 if cls.debug_gdbserver:
393 print(double_line_delim)
394 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
396 print(double_line_delim)
397 print("Spawned VPP with PID: %d" % cls.vpp.pid)
399 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
401 print(single_line_delim)
402 print("You can debug the VPP using e.g.:")
403 if cls.debug_gdbserver:
404 print("sudo gdb " + cls.vpp_bin +
405 " -ex 'target remote localhost:7777'")
406 print("Now is the time to attach a gdb by running the above "
407 "command, set up breakpoints etc. and then resume VPP from "
408 "within gdb by issuing the 'continue' command")
410 print("sudo gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
411 print("Now is the time to attach a gdb by running the above "
412 "command and set up breakpoints etc.")
413 print(single_line_delim)
414 input("Press ENTER to continue running the testcase...")
418 cmdline = cls.vpp_cmdline
420 if cls.debug_gdbserver:
421 gdbserver = '/usr/bin/gdbserver'
422 if not os.path.isfile(gdbserver) or \
423 not os.access(gdbserver, os.X_OK):
424 raise Exception("gdbserver binary '%s' does not exist or is "
425 "not executable" % gdbserver)
427 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
428 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
431 cls.vpp = subprocess.Popen(cmdline,
432 stdout=subprocess.PIPE,
433 stderr=subprocess.PIPE,
435 except subprocess.CalledProcessError as e:
436 cls.logger.critical("Subprocess returned with non-0 return code: ("
440 cls.logger.critical("Subprocess returned with OS error: "
441 "(%s) %s", e.errno, e.strerror)
443 except Exception as e:
444 cls.logger.exception("Subprocess returned unexpected from "
451 def wait_for_stats_socket(cls):
452 deadline = time.time() + 3
454 while time.time() < deadline or \
455 cls.debug_gdb or cls.debug_gdbserver:
456 if os.path.exists(cls.stats_sock):
461 cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
464 def wait_for_coredump(cls):
465 corefile = cls.tempdir + "/core"
466 if os.path.isfile(corefile):
467 cls.logger.error("Waiting for coredump to complete: %s", corefile)
468 curr_size = os.path.getsize(corefile)
469 deadline = time.time() + 60
471 while time.time() < deadline:
474 curr_size = os.path.getsize(corefile)
475 if size == curr_size:
479 cls.logger.error("Timed out waiting for coredump to complete:"
482 cls.logger.error("Coredump complete: %s, size %d",
488 Perform class setup before running the testcase
489 Remove shared memory files, start vpp and connect the vpp-api
491 super(VppTestCase, cls).setUpClass()
492 gc.collect() # run garbage collection first
494 cls.logger = get_logger(cls.__name__)
495 if hasattr(cls, 'parallel_handler'):
496 cls.logger.addHandler(cls.parallel_handler)
497 cls.logger.propagate = False
499 cls.tempdir = tempfile.mkdtemp(
500 prefix='vpp-unittest-%s-' % cls.__name__)
501 cls.stats_sock = "%s/stats.sock" % cls.tempdir
502 cls.api_sock = "%s/api.sock" % cls.tempdir
503 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
504 cls.file_handler.setFormatter(
505 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
507 cls.file_handler.setLevel(DEBUG)
508 cls.logger.addHandler(cls.file_handler)
509 cls.logger.debug("--- setUpClass() for %s called ---" %
511 cls.shm_prefix = os.path.basename(cls.tempdir)
512 os.chdir(cls.tempdir)
513 cls.logger.info("Temporary dir is %s, shm prefix is %s",
514 cls.tempdir, cls.shm_prefix)
516 cls.reset_packet_infos()
518 cls._zombie_captures = []
521 cls.registry = VppObjectRegistry()
522 cls.vpp_startup_failed = False
523 cls.reporter = KeepAliveReporter()
524 # need to catch exceptions here because if we raise, then the cleanup
525 # doesn't get called and we might end with a zombie vpp
528 cls.reporter.send_keep_alive(cls, 'setUpClass')
529 VppTestResult.current_test_case_info = TestCaseInfo(
530 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
531 cls.vpp_stdout_deque = deque()
532 cls.vpp_stderr_deque = deque()
533 cls.pump_thread_stop_flag = Event()
534 cls.pump_thread_wakeup_pipe = os.pipe()
535 cls.pump_thread = Thread(target=pump_output, args=(cls,))
536 cls.pump_thread.daemon = True
537 cls.pump_thread.start()
538 if cls.debug_gdb or cls.debug_gdbserver:
542 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
545 hook = hookmodule.StepHook(cls)
547 hook = hookmodule.PollHook(cls)
548 cls.vapi.register_hook(hook)
549 cls.wait_for_stats_socket()
550 cls.statistics = VPPStats(socketname=cls.stats_sock)
554 cls.vpp_startup_failed = True
556 "VPP died shortly after startup, check the"
557 " output to standard error for possible cause")
563 cls.vapi.disconnect()
566 if cls.debug_gdbserver:
567 print(colorize("You're running VPP inside gdbserver but "
568 "VPP-API connection failed, did you forget "
569 "to 'continue' VPP from within gdb?", RED))
579 Disconnect vpp-api, kill vpp and cleanup shared memory files
581 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
583 if cls.vpp.returncode is None:
584 print(double_line_delim)
585 print("VPP or GDB server is still running")
586 print(single_line_delim)
587 input("When done debugging, press ENTER to kill the "
588 "process and finish running the testcase...")
590 # first signal that we want to stop the pump thread, then wake it up
591 if hasattr(cls, 'pump_thread_stop_flag'):
592 cls.pump_thread_stop_flag.set()
593 if hasattr(cls, 'pump_thread_wakeup_pipe'):
594 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
595 if hasattr(cls, 'pump_thread'):
596 cls.logger.debug("Waiting for pump thread to stop")
597 cls.pump_thread.join()
598 if hasattr(cls, 'vpp_stderr_reader_thread'):
599 cls.logger.debug("Waiting for stdderr pump to stop")
600 cls.vpp_stderr_reader_thread.join()
602 if hasattr(cls, 'vpp'):
603 if hasattr(cls, 'vapi'):
604 cls.logger.debug("Disconnecting class vapi client on %s",
606 cls.vapi.disconnect()
607 cls.logger.debug("Deleting class vapi attribute on %s",
611 if cls.vpp.returncode is None:
612 cls.wait_for_coredump()
613 cls.logger.debug("Sending TERM to vpp")
615 cls.logger.debug("Waiting for vpp to die")
616 cls.vpp.communicate()
617 cls.logger.debug("Deleting class vpp attribute on %s",
621 if cls.vpp_startup_failed:
622 stdout_log = cls.logger.info
623 stderr_log = cls.logger.critical
625 stdout_log = cls.logger.info
626 stderr_log = cls.logger.info
628 if hasattr(cls, 'vpp_stdout_deque'):
629 stdout_log(single_line_delim)
630 stdout_log('VPP output to stdout while running %s:', cls.__name__)
631 stdout_log(single_line_delim)
632 vpp_output = "".join(cls.vpp_stdout_deque)
633 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
635 stdout_log('\n%s', vpp_output)
636 stdout_log(single_line_delim)
638 if hasattr(cls, 'vpp_stderr_deque'):
639 stderr_log(single_line_delim)
640 stderr_log('VPP output to stderr while running %s:', cls.__name__)
641 stderr_log(single_line_delim)
642 vpp_output = "".join(cls.vpp_stderr_deque)
643 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
645 stderr_log('\n%s', vpp_output)
646 stderr_log(single_line_delim)
649 def tearDownClass(cls):
650 """ Perform final cleanup after running all tests in this test-case """
651 cls.logger.debug("--- tearDownClass() for %s called ---" %
653 cls.reporter.send_keep_alive(cls, 'tearDownClass')
655 cls.file_handler.close()
656 cls.reset_packet_infos()
658 debug_internal.on_tear_down_class(cls)
660 def show_commands_at_teardown(self):
661 """ Allow subclass specific teardown logging additions."""
662 self.logger.info("--- No test specific show commands provided. ---")
665 """ Show various debug prints after each test """
666 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
667 (self.__class__.__name__, self._testMethodName,
668 self._testMethodDoc))
671 if not self.vpp_dead:
672 self.logger.debug(self.vapi.cli("show trace max 1000"))
673 self.logger.info(self.vapi.ppcli("show interface"))
674 self.logger.info(self.vapi.ppcli("show hardware"))
675 self.logger.info(self.statistics.set_errors_str())
676 self.logger.info(self.vapi.ppcli("show run"))
677 self.logger.info(self.vapi.ppcli("show log"))
678 self.logger.info(self.vapi.ppcli("show bihash"))
679 self.logger.info("Logging testcase specific show commands.")
680 self.show_commands_at_teardown()
681 self.registry.remove_vpp_config(self.logger)
682 # Save/Dump VPP api trace log
683 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
684 tmp_api_trace = "/tmp/%s" % api_trace
685 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
686 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
687 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
689 os.rename(tmp_api_trace, vpp_api_trace_log)
690 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
692 except VppTransportShmemIOError:
693 self.logger.debug("VppTransportShmemIOError: Vpp dead. "
694 "Cannot log show commands.")
697 self.registry.unregister_all(self.logger)
700 """ Clear trace before running each test"""
701 super(VppTestCase, self).setUp()
702 self.reporter.send_keep_alive(self)
705 raise VppDiedError(rv=None, testcase=self.__class__.__name__,
706 method_name=self._testMethodName)
707 self.sleep(.1, "during setUp")
708 self.vpp_stdout_deque.append(
709 "--- test setUp() for %s.%s(%s) starts here ---\n" %
710 (self.__class__.__name__, self._testMethodName,
711 self._testMethodDoc))
712 self.vpp_stderr_deque.append(
713 "--- test setUp() for %s.%s(%s) starts here ---\n" %
714 (self.__class__.__name__, self._testMethodName,
715 self._testMethodDoc))
716 self.vapi.cli("clear trace")
717 # store the test instance inside the test class - so that objects
718 # holding the class can access instance methods (like assertEqual)
719 type(self).test_instance = self
722 def pg_enable_capture(cls, interfaces=None):
724 Enable capture on packet-generator interfaces
726 :param interfaces: iterable interface indexes (if None,
727 use self.pg_interfaces)
730 if interfaces is None:
731 interfaces = cls.pg_interfaces
736 def register_capture(cls, cap_name):
737 """ Register a capture in the testclass """
738 # add to the list of captures with current timestamp
739 cls._captures.append((time.time(), cap_name))
740 # filter out from zombies
741 cls._zombie_captures = [(stamp, name)
742 for (stamp, name) in cls._zombie_captures
747 """ Remove any zombie captures and enable the packet generator """
748 # how long before capture is allowed to be deleted - otherwise vpp
749 # crashes - 100ms seems enough (this shouldn't be needed at all)
752 for stamp, cap_name in cls._zombie_captures:
753 wait = stamp + capture_ttl - now
755 cls.sleep(wait, "before deleting capture %s" % cap_name)
757 cls.logger.debug("Removing zombie capture %s" % cap_name)
758 cls.vapi.cli('packet-generator delete %s' % cap_name)
760 cls.vapi.cli("trace add pg-input 1000")
761 cls.vapi.cli('packet-generator enable')
762 cls._zombie_captures = cls._captures
766 def create_pg_interfaces(cls, interfaces):
768 Create packet-generator interfaces.
770 :param interfaces: iterable indexes of the interfaces.
771 :returns: List of created interfaces.
776 intf = VppPGInterface(cls, i)
777 setattr(cls, intf.name, intf)
779 cls.pg_interfaces = result
783 def create_loopback_interfaces(cls, count):
785 Create loopback interfaces.
787 :param count: number of interfaces created.
788 :returns: List of created interfaces.
790 result = [VppLoInterface(cls) for i in range(count)]
792 setattr(cls, intf.name, intf)
793 cls.lo_interfaces = result
797 def create_bvi_interfaces(cls, count):
799 Create BVI interfaces.
801 :param count: number of interfaces created.
802 :returns: List of created interfaces.
804 result = [VppBviInterface(cls) for i in range(count)]
806 setattr(cls, intf.name, intf)
807 cls.bvi_interfaces = result
811 def extend_packet(packet, size, padding=' '):
813 Extend packet to given size by padding with spaces or custom padding
814 NOTE: Currently works only when Raw layer is present.
816 :param packet: packet
817 :param size: target size
818 :param padding: padding used to extend the payload
821 packet_len = len(packet) + 4
822 extend = size - packet_len
824 num = (extend // len(padding)) + 1
825 packet[Raw].load += (padding * num)[:extend].encode("ascii")
828 def reset_packet_infos(cls):
829 """ Reset the list of packet info objects and packet counts to zero """
830 cls._packet_infos = {}
831 cls._packet_count_for_dst_if_idx = {}
834 def create_packet_info(cls, src_if, dst_if):
836 Create packet info object containing the source and destination indexes
837 and add it to the testcase's packet info list
839 :param VppInterface src_if: source interface
840 :param VppInterface dst_if: destination interface
842 :returns: _PacketInfo object
846 info.index = len(cls._packet_infos)
847 info.src = src_if.sw_if_index
848 info.dst = dst_if.sw_if_index
849 if isinstance(dst_if, VppSubInterface):
850 dst_idx = dst_if.parent.sw_if_index
852 dst_idx = dst_if.sw_if_index
853 if dst_idx in cls._packet_count_for_dst_if_idx:
854 cls._packet_count_for_dst_if_idx[dst_idx] += 1
856 cls._packet_count_for_dst_if_idx[dst_idx] = 1
857 cls._packet_infos[info.index] = info
861 def info_to_payload(info):
863 Convert _PacketInfo object to packet payload
865 :param info: _PacketInfo object
867 :returns: string containing serialized data from packet info
869 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
873 def payload_to_info(payload, payload_field='load'):
875 Convert packet payload to _PacketInfo object
877 :param payload: packet payload
878 :type payload: <class 'scapy.packet.Raw'>
879 :param payload_field: packet fieldname of payload "load" for
880 <class 'scapy.packet.Raw'>
881 :type payload_field: str
882 :returns: _PacketInfo object containing de-serialized data from payload
885 numbers = getattr(payload, payload_field).split()
887 info.index = int(numbers[0])
888 info.src = int(numbers[1])
889 info.dst = int(numbers[2])
890 info.ip = int(numbers[3])
891 info.proto = int(numbers[4])
894 def get_next_packet_info(self, info):
896 Iterate over the packet info list stored in the testcase
897 Start iteration with first element if info is None
898 Continue based on index in info if info is specified
900 :param info: info or None
901 :returns: next info in list or None if no more infos
906 next_index = info.index + 1
907 if next_index == len(self._packet_infos):
910 return self._packet_infos[next_index]
912 def get_next_packet_info_for_interface(self, src_index, info):
914 Search the packet info list for the next packet info with same source
917 :param src_index: source interface index to search for
918 :param info: packet info - where to start the search
919 :returns: packet info or None
923 info = self.get_next_packet_info(info)
926 if info.src == src_index:
929 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
931 Search the packet info list for the next packet info with same source
932 and destination interface indexes
934 :param src_index: source interface index to search for
935 :param dst_index: destination interface index to search for
936 :param info: packet info - where to start the search
937 :returns: packet info or None
941 info = self.get_next_packet_info_for_interface(src_index, info)
944 if info.dst == dst_index:
947 def assert_equal(self, real_value, expected_value, name_or_class=None):
948 if name_or_class is None:
949 self.assertEqual(real_value, expected_value)
952 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
953 msg = msg % (getdoc(name_or_class).strip(),
954 real_value, str(name_or_class(real_value)),
955 expected_value, str(name_or_class(expected_value)))
957 msg = "Invalid %s: %s does not match expected value %s" % (
958 name_or_class, real_value, expected_value)
960 self.assertEqual(real_value, expected_value, msg)
962 def assert_in_range(self,
970 msg = "Invalid %s: %s out of range <%s,%s>" % (
971 name, real_value, expected_min, expected_max)
972 self.assertTrue(expected_min <= real_value <= expected_max, msg)
974 def assert_packet_checksums_valid(self, packet,
975 ignore_zero_udp_checksums=True):
976 received = packet.__class__(scapy.compat.raw(packet))
978 ppp("Verifying packet checksums for packet:", received))
979 udp_layers = ['UDP', 'UDPerror']
980 checksum_fields = ['cksum', 'chksum']
983 temp = received.__class__(scapy.compat.raw(received))
985 layer = temp.getlayer(counter)
987 for cf in checksum_fields:
988 if hasattr(layer, cf):
989 if ignore_zero_udp_checksums and \
990 0 == getattr(layer, cf) and \
991 layer.name in udp_layers:
994 checksums.append((counter, cf))
997 counter = counter + 1
998 if 0 == len(checksums):
1000 temp = temp.__class__(scapy.compat.raw(temp))
1001 for layer, cf in checksums:
1002 calc_sum = getattr(temp[layer], cf)
1004 getattr(received[layer], cf), calc_sum,
1005 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
1007 "Checksum field `%s` on `%s` layer has correct value `%s`" %
1008 (cf, temp[layer].name, calc_sum))
1010 def assert_checksum_valid(self, received_packet, layer,
1011 field_name='chksum',
1012 ignore_zero_checksum=False):
1013 """ Check checksum of received packet on given layer """
1014 received_packet_checksum = getattr(received_packet[layer], field_name)
1015 if ignore_zero_checksum and 0 == received_packet_checksum:
1017 recalculated = received_packet.__class__(
1018 scapy.compat.raw(received_packet))
1019 delattr(recalculated[layer], field_name)
1020 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1021 self.assert_equal(received_packet_checksum,
1022 getattr(recalculated[layer], field_name),
1023 "packet checksum on layer: %s" % layer)
1025 def assert_ip_checksum_valid(self, received_packet,
1026 ignore_zero_checksum=False):
1027 self.assert_checksum_valid(received_packet, 'IP',
1028 ignore_zero_checksum=ignore_zero_checksum)
1030 def assert_tcp_checksum_valid(self, received_packet,
1031 ignore_zero_checksum=False):
1032 self.assert_checksum_valid(received_packet, 'TCP',
1033 ignore_zero_checksum=ignore_zero_checksum)
1035 def assert_udp_checksum_valid(self, received_packet,
1036 ignore_zero_checksum=True):
1037 self.assert_checksum_valid(received_packet, 'UDP',
1038 ignore_zero_checksum=ignore_zero_checksum)
1040 def assert_embedded_icmp_checksum_valid(self, received_packet):
1041 if received_packet.haslayer(IPerror):
1042 self.assert_checksum_valid(received_packet, 'IPerror')
1043 if received_packet.haslayer(TCPerror):
1044 self.assert_checksum_valid(received_packet, 'TCPerror')
1045 if received_packet.haslayer(UDPerror):
1046 self.assert_checksum_valid(received_packet, 'UDPerror',
1047 ignore_zero_checksum=True)
1048 if received_packet.haslayer(ICMPerror):
1049 self.assert_checksum_valid(received_packet, 'ICMPerror')
1051 def assert_icmp_checksum_valid(self, received_packet):
1052 self.assert_checksum_valid(received_packet, 'ICMP')
1053 self.assert_embedded_icmp_checksum_valid(received_packet)
1055 def assert_icmpv6_checksum_valid(self, pkt):
1056 if pkt.haslayer(ICMPv6DestUnreach):
1057 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
1058 self.assert_embedded_icmp_checksum_valid(pkt)
1059 if pkt.haslayer(ICMPv6EchoRequest):
1060 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
1061 if pkt.haslayer(ICMPv6EchoReply):
1062 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
1064 def get_packet_counter(self, counter):
1065 if counter.startswith("/"):
1066 counter_value = self.statistics.get_counter(counter)
1068 counters = self.vapi.cli("sh errors").split('\n')
1070 for i in range(1, len(counters) - 1):
1071 results = counters[i].split()
1072 if results[1] == counter:
1073 counter_value = int(results[0])
1075 return counter_value
1077 def assert_packet_counter_equal(self, counter, expected_value):
1078 counter_value = self.get_packet_counter(counter)
1079 self.assert_equal(counter_value, expected_value,
1080 "packet counter `%s'" % counter)
1082 def assert_error_counter_equal(self, counter, expected_value):
1083 counter_value = self.statistics.get_err_counter(counter)
1084 self.assert_equal(counter_value, expected_value,
1085 "error counter `%s'" % counter)
1088 def sleep(cls, timeout, remark=None):
1090 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1091 # * by Guido, only the main thread can be interrupted.
1093 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1096 if hasattr(os, 'sched_yield'):
1102 if hasattr(cls, 'logger'):
1103 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1104 before = time.time()
1107 if hasattr(cls, 'logger') and after - before > 2 * timeout:
1108 cls.logger.error("unexpected self.sleep() result - "
1109 "slept for %es instead of ~%es!",
1110 after - before, timeout)
1111 if hasattr(cls, 'logger'):
1113 "Finished sleep (%s) - slept %es (wanted %es)",
1114 remark, after - before, timeout)
1116 def pg_send(self, intf, pkts):
1117 self.vapi.cli("clear trace")
1118 intf.add_stream(pkts)
1119 self.pg_enable_capture(self.pg_interfaces)
1122 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
1123 self.pg_send(intf, pkts)
1126 for i in self.pg_interfaces:
1127 i.get_capture(0, timeout=timeout)
1128 i.assert_nothing_captured(remark=remark)
1131 def send_and_expect(self, intf, pkts, output, n_rx=None):
1134 self.pg_send(intf, pkts)
1135 rx = output.get_capture(n_rx)
1138 def send_and_expect_only(self, intf, pkts, output, timeout=None):
1139 self.pg_send(intf, pkts)
1140 rx = output.get_capture(len(pkts))
1144 for i in self.pg_interfaces:
1145 if i not in outputs:
1146 i.get_capture(0, timeout=timeout)
1147 i.assert_nothing_captured()
1153 """ unittest calls runTest when TestCase is instantiated without a
1154 test case. Use case: Writing unittests against VppTestCase"""
1158 def get_testcase_doc_name(test):
1159 return getdoc(test.__class__).splitlines()[0]
1162 def get_test_description(descriptions, test):
1163 short_description = test.shortDescription()
1164 if descriptions and short_description:
1165 return short_description
1170 class TestCaseInfo(object):
1171 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1172 self.logger = logger
1173 self.tempdir = tempdir
1174 self.vpp_pid = vpp_pid
1175 self.vpp_bin_path = vpp_bin_path
1176 self.core_crash_test = None
1179 class VppTestResult(unittest.TestResult):
1181 @property result_string
1182 String variable to store the test case result string.
1184 List variable containing 2-tuples of TestCase instances and strings
1185 holding formatted tracebacks. Each tuple represents a test which
1186 raised an unexpected exception.
1188 List variable containing 2-tuples of TestCase instances and strings
1189 holding formatted tracebacks. Each tuple represents a test where
1190 a failure was explicitly signalled using the TestCase.assert*()
1194 failed_test_cases_info = set()
1195 core_crash_test_cases_info = set()
1196 current_test_case_info = None
1198 def __init__(self, stream=None, descriptions=None, verbosity=None,
1201 :param stream File descriptor to store where to report test results.
1202 Set to the standard error stream by default.
1203 :param descriptions Boolean variable to store information if to use
1204 test case descriptions.
1205 :param verbosity Integer variable to store required verbosity level.
1207 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1208 self.stream = stream
1209 self.descriptions = descriptions
1210 self.verbosity = verbosity
1211 self.result_string = None
1212 self.runner = runner
1214 def addSuccess(self, test):
1216 Record a test succeeded result
1221 if self.current_test_case_info:
1222 self.current_test_case_info.logger.debug(
1223 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1224 test._testMethodName,
1225 test._testMethodDoc))
1226 unittest.TestResult.addSuccess(self, test)
1227 self.result_string = colorize("OK", GREEN)
1229 self.send_result_through_pipe(test, PASS)
1231 def addSkip(self, test, reason):
1233 Record a test skipped.
1239 if self.current_test_case_info:
1240 self.current_test_case_info.logger.debug(
1241 "--- addSkip() %s.%s(%s) called, reason is %s" %
1242 (test.__class__.__name__, test._testMethodName,
1243 test._testMethodDoc, reason))
1244 unittest.TestResult.addSkip(self, test, reason)
1245 self.result_string = colorize("SKIP", YELLOW)
1247 self.send_result_through_pipe(test, SKIP)
1249 def symlink_failed(self):
1250 if self.current_test_case_info:
1252 failed_dir = os.getenv('FAILED_DIR')
1253 link_path = os.path.join(
1256 os.path.basename(self.current_test_case_info.tempdir))
1257 if self.current_test_case_info.logger:
1258 self.current_test_case_info.logger.debug(
1259 "creating a link to the failed test")
1260 self.current_test_case_info.logger.debug(
1261 "os.symlink(%s, %s)" %
1262 (self.current_test_case_info.tempdir, link_path))
1263 if os.path.exists(link_path):
1264 if self.current_test_case_info.logger:
1265 self.current_test_case_info.logger.debug(
1266 'symlink already exists')
1268 os.symlink(self.current_test_case_info.tempdir, link_path)
1270 except Exception as e:
1271 if self.current_test_case_info.logger:
1272 self.current_test_case_info.logger.error(e)
1274 def send_result_through_pipe(self, test, result):
1275 if hasattr(self, 'test_framework_result_pipe'):
1276 pipe = self.test_framework_result_pipe
1278 pipe.send((test.id(), result))
1280 def log_error(self, test, err, fn_name):
1281 if self.current_test_case_info:
1282 if isinstance(test, unittest.suite._ErrorHolder):
1283 test_name = test.description
1285 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1286 test._testMethodName,
1287 test._testMethodDoc)
1288 self.current_test_case_info.logger.debug(
1289 "--- %s() %s called, err is %s" %
1290 (fn_name, test_name, err))
1291 self.current_test_case_info.logger.debug(
1292 "formatted exception is:\n%s" %
1293 "".join(format_exception(*err)))
1295 def add_error(self, test, err, unittest_fn, error_type):
1296 if error_type == FAIL:
1297 self.log_error(test, err, 'addFailure')
1298 error_type_str = colorize("FAIL", RED)
1299 elif error_type == ERROR:
1300 self.log_error(test, err, 'addError')
1301 error_type_str = colorize("ERROR", RED)
1303 raise Exception('Error type %s cannot be used to record an '
1304 'error or a failure' % error_type)
1306 unittest_fn(self, test, err)
1307 if self.current_test_case_info:
1308 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1310 self.current_test_case_info.tempdir)
1311 self.symlink_failed()
1312 self.failed_test_cases_info.add(self.current_test_case_info)
1313 if is_core_present(self.current_test_case_info.tempdir):
1314 if not self.current_test_case_info.core_crash_test:
1315 if isinstance(test, unittest.suite._ErrorHolder):
1316 test_name = str(test)
1318 test_name = "'{!s}' ({!s})".format(
1319 get_testcase_doc_name(test), test.id())
1320 self.current_test_case_info.core_crash_test = test_name
1321 self.core_crash_test_cases_info.add(
1322 self.current_test_case_info)
1324 self.result_string = '%s [no temp dir]' % error_type_str
1326 self.send_result_through_pipe(test, error_type)
1328 def addFailure(self, test, err):
1330 Record a test failed result
1333 :param err: error message
1336 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1338 def addError(self, test, err):
1340 Record a test error result
1343 :param err: error message
1346 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1348 def getDescription(self, test):
1350 Get test description
1353 :returns: test description
1356 return get_test_description(self.descriptions, test)
1358 def startTest(self, test):
1366 def print_header(test):
1367 if not hasattr(test.__class__, '_header_printed'):
1368 print(double_line_delim)
1369 print(colorize(getdoc(test).splitlines()[0], GREEN))
1370 print(double_line_delim)
1371 test.__class__._header_printed = True
1375 unittest.TestResult.startTest(self, test)
1376 if self.verbosity > 0:
1377 self.stream.writeln(
1378 "Starting " + self.getDescription(test) + " ...")
1379 self.stream.writeln(single_line_delim)
1381 def stopTest(self, test):
1383 Called when the given test has been run
1388 unittest.TestResult.stopTest(self, test)
1389 if self.verbosity > 0:
1390 self.stream.writeln(single_line_delim)
1391 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1392 self.result_string))
1393 self.stream.writeln(single_line_delim)
1395 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1396 self.result_string))
1398 self.send_result_through_pipe(test, TEST_RUN)
1400 def printErrors(self):
1402 Print errors from running the test case
1404 if len(self.errors) > 0 or len(self.failures) > 0:
1405 self.stream.writeln()
1406 self.printErrorList('ERROR', self.errors)
1407 self.printErrorList('FAIL', self.failures)
1409 # ^^ that is the last output from unittest before summary
1410 if not self.runner.print_summary:
1411 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1412 self.stream = devnull
1413 self.runner.stream = devnull
1415 def printErrorList(self, flavour, errors):
1417 Print error list to the output stream together with error type
1418 and test case description.
1420 :param flavour: error type
1421 :param errors: iterable errors
1424 for test, err in errors:
1425 self.stream.writeln(double_line_delim)
1426 self.stream.writeln("%s: %s" %
1427 (flavour, self.getDescription(test)))
1428 self.stream.writeln(single_line_delim)
1429 self.stream.writeln("%s" % err)
1432 class VppTestRunner(unittest.TextTestRunner):
1434 A basic test runner implementation which prints results to standard error.
1438 def resultclass(self):
1439 """Class maintaining the results of the tests"""
1440 return VppTestResult
1442 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1443 result_pipe=None, failfast=False, buffer=False,
1444 resultclass=None, print_summary=True, **kwargs):
1445 # ignore stream setting here, use hard-coded stdout to be in sync
1446 # with prints from VppTestCase methods ...
1447 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1448 verbosity, failfast, buffer,
1449 resultclass, **kwargs)
1450 KeepAliveReporter.pipe = keep_alive_pipe
1452 self.orig_stream = self.stream
1453 self.resultclass.test_framework_result_pipe = result_pipe
1455 self.print_summary = print_summary
1457 def _makeResult(self):
1458 return self.resultclass(self.stream,
1463 def run(self, test):
1470 faulthandler.enable() # emit stack trace to stderr if killed by signal
1472 result = super(VppTestRunner, self).run(test)
1473 if not self.print_summary:
1474 self.stream = self.orig_stream
1475 result.stream = self.orig_stream
1479 class Worker(Thread):
1480 def __init__(self, args, logger, env=None):
1481 self.logger = logger
1485 env = {} if env is None else env
1486 self.env = copy.deepcopy(env)
1487 super(Worker, self).__init__()
1490 executable = self.args[0]
1491 if not os.path.exists(executable) or not os.access(
1492 executable, os.F_OK | os.X_OK):
1493 # Exit code that means some system file did not exist,
1494 # could not be opened, or had some other kind of error.
1495 self.result = os.EX_OSFILE
1496 raise EnvironmentError(
1497 "executable '%s' is not found or executable." % executable)
1498 self.logger.debug("Running executable w/args `%s'" % self.args)
1499 env = os.environ.copy()
1500 env.update(self.env)
1501 env["CK_LOG_FILE_NAME"] = "-"
1502 self.process = subprocess.Popen(
1503 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1504 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1505 out, err = self.process.communicate()
1506 self.logger.debug("Finished running `%s'" % executable)
1507 self.logger.info("Return code is `%s'" % self.process.returncode)
1508 self.logger.info(single_line_delim)
1509 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1510 self.logger.info(single_line_delim)
1511 self.logger.info(out)
1512 self.logger.info(single_line_delim)
1513 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1514 self.logger.info(single_line_delim)
1515 self.logger.info(err)
1516 self.logger.info(single_line_delim)
1517 self.result = self.process.returncode
1520 if __name__ == '__main__':