3 from __future__ import print_function
17 from collections import deque
18 from threading import Thread, Event
19 from inspect import getdoc, isclass
20 from traceback import format_exception
21 from logging import FileHandler, DEBUG, Formatter
24 from scapy.packet import Raw
25 import hook as hookmodule
26 from vpp_pg_interface import VppPGInterface
27 from vpp_sub_interface import VppSubInterface
28 from vpp_lo_interface import VppLoInterface
29 from vpp_bvi_interface import VppBviInterface
30 from vpp_papi_provider import VppPapiProvider
31 from vpp_papi.vpp_stats import VPPStats
32 from vpp_papi.vpp_transport_shmem import VppTransportShmemIOError
33 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
35 from vpp_object import VppObjectRegistry
36 from util import ppp, is_core_present
37 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
38 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
39 from scapy.layers.inet6 import ICMPv6EchoReply
41 if os.name == 'posix' and sys.version_info[0] < 3:
42 # using subprocess32 is recommended by python official documentation
43 # @ https://docs.python.org/2/library/subprocess.html
44 import subprocess32 as subprocess
48 # Python2/3 compatible
60 debug_framework = False
61 if os.getenv('TEST_DEBUG', "0") == "1":
62 debug_framework = True
66 Test framework module.
68 The module provides a set of tools for constructing and running tests and
69 representing the results.
73 class VppDiedError(Exception):
74 """ exception for reporting that the subprocess has died."""
76 signals_by_value = {v: k for k, v in signal.__dict__.items() if
77 k.startswith('SIG') and not k.startswith('SIG_')}
79 def __init__(self, rv=None, testcase=None, method_name=None):
81 self.signal_name = None
82 self.testcase = testcase
83 self.method_name = method_name
86 self.signal_name = VppDiedError.signals_by_value[-rv]
90 if testcase is None and method_name is None:
93 in_msg = 'running %s.%s ' % (testcase, method_name)
95 msg = "VPP subprocess died %sunexpectedly with return code: %d%s." % (
98 ' [%s]' % self.signal_name if
99 self.signal_name is not None else '')
100 super(VppDiedError, self).__init__(msg)
103 class _PacketInfo(object):
104 """Private class to create packet info object.
106 Help process information about the next packet.
107 Set variables to default values.
109 #: Store the index of the packet.
111 #: Store the index of the source packet generator interface of the packet.
113 #: Store the index of the destination packet generator interface
116 #: Store expected ip version
118 #: Store expected upper protocol
120 #: Store the copy of the former packet.
123 def __eq__(self, other):
124 index = self.index == other.index
125 src = self.src == other.src
126 dst = self.dst == other.dst
127 data = self.data == other.data
128 return index and src and dst and data
131 def pump_output(testclass):
132 """ pump output from vpp stdout/stderr to proper queues """
135 while not testclass.pump_thread_stop_flag.is_set():
136 readable = select.select([testclass.vpp.stdout.fileno(),
137 testclass.vpp.stderr.fileno(),
138 testclass.pump_thread_wakeup_pipe[0]],
140 if testclass.vpp.stdout.fileno() in readable:
141 read = os.read(testclass.vpp.stdout.fileno(), 102400)
143 split = read.splitlines(True)
144 if len(stdout_fragment) > 0:
145 split[0] = "%s%s" % (stdout_fragment, split[0])
146 if len(split) > 0 and split[-1].endswith("\n"):
150 stdout_fragment = split[-1]
151 testclass.vpp_stdout_deque.extend(split[:limit])
152 if not testclass.cache_vpp_output:
153 for line in split[:limit]:
154 testclass.logger.debug(
155 "VPP STDOUT: %s" % line.rstrip("\n"))
156 if testclass.vpp.stderr.fileno() in readable:
157 read = os.read(testclass.vpp.stderr.fileno(), 102400)
159 split = read.splitlines(True)
160 if len(stderr_fragment) > 0:
161 split[0] = "%s%s" % (stderr_fragment, split[0])
162 if len(split) > 0 and split[-1].endswith(b"\n"):
166 stderr_fragment = split[-1]
167 testclass.vpp_stderr_deque.extend(split[:limit])
168 if not testclass.cache_vpp_output:
169 for line in split[:limit]:
170 testclass.logger.debug(
171 "VPP STDERR: %s" % line.rstrip("\n"))
172 # ignoring the dummy pipe here intentionally - the
173 # flag will take care of properly terminating the loop
176 def _is_skip_aarch64_set():
177 return os.getenv('SKIP_AARCH64', 'n').lower() in ('yes', 'y', '1')
180 is_skip_aarch64_set = _is_skip_aarch64_set()
183 def _is_platform_aarch64():
184 return platform.machine() == 'aarch64'
187 is_platform_aarch64 = _is_platform_aarch64()
190 def _running_extended_tests():
191 s = os.getenv("EXTENDED_TESTS", "n")
192 return True if s.lower() in ("y", "yes", "1") else False
195 running_extended_tests = _running_extended_tests()
198 def _running_on_centos():
199 os_id = os.getenv("OS_ID", "")
200 return True if "centos" in os_id.lower() else False
203 running_on_centos = _running_on_centos
206 class KeepAliveReporter(object):
208 Singleton object which reports test start to parent process
213 self.__dict__ = self._shared_state
221 def pipe(self, pipe):
222 if self._pipe is not None:
223 raise Exception("Internal error - pipe should only be set once.")
226 def send_keep_alive(self, test, desc=None):
228 Write current test tmpdir & desc to keep-alive pipe to signal liveness
230 if self.pipe is None:
231 # if not running forked..
235 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
239 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
242 class VppTestCase(unittest.TestCase):
243 """This subclass is a base class for VPP test cases that are implemented as
244 classes. It provides methods to create and run test case.
247 extra_vpp_punt_config = []
248 extra_vpp_plugin_config = []
251 def packet_infos(self):
252 """List of packet infos"""
253 return self._packet_infos
256 def get_packet_count_for_if_idx(cls, dst_if_index):
257 """Get the number of packet info for specified destination if index"""
258 if dst_if_index in cls._packet_count_for_dst_if_idx:
259 return cls._packet_count_for_dst_if_idx[dst_if_index]
265 """Return the instance of this testcase"""
266 return cls.test_instance
269 def set_debug_flags(cls, d):
270 cls.debug_core = False
271 cls.debug_gdb = False
272 cls.debug_gdbserver = False
277 cls.debug_core = True
280 elif dl == "gdbserver":
281 cls.debug_gdbserver = True
283 raise Exception("Unrecognized DEBUG option: '%s'" % d)
286 def get_least_used_cpu():
287 cpu_usage_list = [set(range(psutil.cpu_count()))]
288 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
289 if 'vpp_main' == p.info['name']]
290 for vpp_process in vpp_processes:
291 for cpu_usage_set in cpu_usage_list:
293 cpu_num = vpp_process.cpu_num()
294 if cpu_num in cpu_usage_set:
295 cpu_usage_set_index = cpu_usage_list.index(
297 if cpu_usage_set_index == len(cpu_usage_list) - 1:
298 cpu_usage_list.append({cpu_num})
300 cpu_usage_list[cpu_usage_set_index + 1].add(
302 cpu_usage_set.remove(cpu_num)
304 except psutil.NoSuchProcess:
307 for cpu_usage_set in cpu_usage_list:
308 if len(cpu_usage_set) > 0:
309 min_usage_set = cpu_usage_set
312 return random.choice(tuple(min_usage_set))
315 def setUpConstants(cls):
316 """ Set-up the test case class based on environment variables """
317 s = os.getenv("STEP", "n")
318 cls.step = True if s.lower() in ("y", "yes", "1") else False
319 d = os.getenv("DEBUG", None)
320 c = os.getenv("CACHE_OUTPUT", "1")
321 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
322 cls.set_debug_flags(d)
323 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
324 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
325 cls.test_plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
326 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
328 if cls.plugin_path is not None:
329 if cls.extern_plugin_path is not None:
330 plugin_path = "%s:%s" % (
331 cls.plugin_path, cls.extern_plugin_path)
333 plugin_path = cls.plugin_path
334 elif cls.extern_plugin_path is not None:
335 plugin_path = cls.extern_plugin_path
337 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
338 debug_cli = "cli-listen localhost:5002"
340 size = os.getenv("COREDUMP_SIZE")
342 coredump_size = "coredump-size %s" % size
343 if coredump_size is None:
344 coredump_size = "coredump-size unlimited"
346 cpu_core_number = cls.get_least_used_cpu()
348 cls.vpp_cmdline = [cls.vpp_bin, "unix",
349 "{", "nodaemon", debug_cli, "full-coredump",
350 coredump_size, "runtime-dir", cls.tempdir, "}",
351 "api-trace", "{", "on", "}", "api-segment", "{",
352 "prefix", cls.shm_prefix, "}", "cpu", "{",
353 "main-core", str(cpu_core_number), "}",
354 "statseg", "{", "socket-name", cls.stats_sock, "}",
355 "socksvr", "{", "socket-name", cls.api_sock, "}",
357 "{", "plugin", "dpdk_plugin.so", "{", "disable",
358 "}", "plugin", "rdma_plugin.so", "{", "disable",
359 "}", "plugin", "unittest_plugin.so", "{", "enable",
360 "}"] + cls.extra_vpp_plugin_config + ["}", ]
361 if cls.extra_vpp_punt_config is not None:
362 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
363 if plugin_path is not None:
364 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
365 if cls.test_plugin_path is not None:
366 cls.vpp_cmdline.extend(["test_plugin_path", cls.test_plugin_path])
368 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
369 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
372 def wait_for_enter(cls):
373 if cls.debug_gdbserver:
374 print(double_line_delim)
375 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
377 print(double_line_delim)
378 print("Spawned VPP with PID: %d" % cls.vpp.pid)
380 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
382 print(single_line_delim)
383 print("You can debug the VPP using e.g.:")
384 if cls.debug_gdbserver:
385 print("sudo gdb " + cls.vpp_bin +
386 " -ex 'target remote localhost:7777'")
387 print("Now is the time to attach a gdb by running the above "
388 "command, set up breakpoints etc. and then resume VPP from "
389 "within gdb by issuing the 'continue' command")
391 print("sudo gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
392 print("Now is the time to attach a gdb by running the above "
393 "command and set up breakpoints etc.")
394 print(single_line_delim)
395 input("Press ENTER to continue running the testcase...")
399 cmdline = cls.vpp_cmdline
401 if cls.debug_gdbserver:
402 gdbserver = '/usr/bin/gdbserver'
403 if not os.path.isfile(gdbserver) or \
404 not os.access(gdbserver, os.X_OK):
405 raise Exception("gdbserver binary '%s' does not exist or is "
406 "not executable" % gdbserver)
408 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
409 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
412 cls.vpp = subprocess.Popen(cmdline,
413 stdout=subprocess.PIPE,
414 stderr=subprocess.PIPE,
416 except subprocess.CalledProcessError as e:
417 cls.logger.critical("Subprocess returned with non-0 return code: ("
421 cls.logger.critical("Subprocess returned with OS error: "
422 "(%s) %s", e.errno, e.strerror)
424 except Exception as e:
425 cls.logger.exception("Subprocess returned unexpected from "
432 def wait_for_stats_socket(cls):
433 deadline = time.time() + 3
435 while time.time() < deadline or \
436 cls.debug_gdb or cls.debug_gdbserver:
437 if os.path.exists(cls.stats_sock):
442 cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
445 def wait_for_coredump(cls):
446 corefile = cls.tempdir + "/core"
447 if os.path.isfile(corefile):
448 cls.logger.error("Waiting for coredump to complete: %s", corefile)
449 curr_size = os.path.getsize(corefile)
450 deadline = time.time() + 60
452 while time.time() < deadline:
455 curr_size = os.path.getsize(corefile)
456 if size == curr_size:
460 cls.logger.error("Timed out waiting for coredump to complete:"
463 cls.logger.error("Coredump complete: %s, size %d",
469 Perform class setup before running the testcase
470 Remove shared memory files, start vpp and connect the vpp-api
472 super(VppTestCase, cls).setUpClass()
473 gc.collect() # run garbage collection first
475 cls.logger = get_logger(cls.__name__)
476 if hasattr(cls, 'parallel_handler'):
477 cls.logger.addHandler(cls.parallel_handler)
478 cls.logger.propagate = False
480 cls.tempdir = tempfile.mkdtemp(
481 prefix='vpp-unittest-%s-' % cls.__name__)
482 cls.stats_sock = "%s/stats.sock" % cls.tempdir
483 cls.api_sock = "%s/api.sock" % cls.tempdir
484 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
485 cls.file_handler.setFormatter(
486 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
488 cls.file_handler.setLevel(DEBUG)
489 cls.logger.addHandler(cls.file_handler)
490 cls.logger.debug("--- setUpClass() for %s called ---" %
492 cls.shm_prefix = os.path.basename(cls.tempdir)
493 os.chdir(cls.tempdir)
494 cls.logger.info("Temporary dir is %s, shm prefix is %s",
495 cls.tempdir, cls.shm_prefix)
497 cls.reset_packet_infos()
499 cls._zombie_captures = []
502 cls.registry = VppObjectRegistry()
503 cls.vpp_startup_failed = False
504 cls.reporter = KeepAliveReporter()
505 # need to catch exceptions here because if we raise, then the cleanup
506 # doesn't get called and we might end with a zombie vpp
509 cls.reporter.send_keep_alive(cls, 'setUpClass')
510 VppTestResult.current_test_case_info = TestCaseInfo(
511 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
512 cls.vpp_stdout_deque = deque()
513 cls.vpp_stderr_deque = deque()
514 cls.pump_thread_stop_flag = Event()
515 cls.pump_thread_wakeup_pipe = os.pipe()
516 cls.pump_thread = Thread(target=pump_output, args=(cls,))
517 cls.pump_thread.daemon = True
518 cls.pump_thread.start()
519 if cls.debug_gdb or cls.debug_gdbserver:
523 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
526 hook = hookmodule.StepHook(cls)
528 hook = hookmodule.PollHook(cls)
529 cls.vapi.register_hook(hook)
530 cls.wait_for_stats_socket()
531 cls.statistics = VPPStats(socketname=cls.stats_sock)
535 cls.vpp_startup_failed = True
537 "VPP died shortly after startup, check the"
538 " output to standard error for possible cause")
544 cls.vapi.disconnect()
547 if cls.debug_gdbserver:
548 print(colorize("You're running VPP inside gdbserver but "
549 "VPP-API connection failed, did you forget "
550 "to 'continue' VPP from within gdb?", RED))
560 Disconnect vpp-api, kill vpp and cleanup shared memory files
562 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
564 if cls.vpp.returncode is None:
565 print(double_line_delim)
566 print("VPP or GDB server is still running")
567 print(single_line_delim)
568 input("When done debugging, press ENTER to kill the "
569 "process and finish running the testcase...")
571 # first signal that we want to stop the pump thread, then wake it up
572 if hasattr(cls, 'pump_thread_stop_flag'):
573 cls.pump_thread_stop_flag.set()
574 if hasattr(cls, 'pump_thread_wakeup_pipe'):
575 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
576 if hasattr(cls, 'pump_thread'):
577 cls.logger.debug("Waiting for pump thread to stop")
578 cls.pump_thread.join()
579 if hasattr(cls, 'vpp_stderr_reader_thread'):
580 cls.logger.debug("Waiting for stdderr pump to stop")
581 cls.vpp_stderr_reader_thread.join()
583 if hasattr(cls, 'vpp'):
584 if hasattr(cls, 'vapi'):
585 cls.logger.debug("Disconnecting class vapi client on %s",
587 cls.vapi.disconnect()
588 cls.logger.debug("Deleting class vapi attribute on %s",
592 if cls.vpp.returncode is None:
593 cls.wait_for_coredump()
594 cls.logger.debug("Sending TERM to vpp")
596 cls.logger.debug("Waiting for vpp to die")
597 cls.vpp.communicate()
598 cls.logger.debug("Deleting class vpp attribute on %s",
602 if cls.vpp_startup_failed:
603 stdout_log = cls.logger.info
604 stderr_log = cls.logger.critical
606 stdout_log = cls.logger.info
607 stderr_log = cls.logger.info
609 if hasattr(cls, 'vpp_stdout_deque'):
610 stdout_log(single_line_delim)
611 stdout_log('VPP output to stdout while running %s:', cls.__name__)
612 stdout_log(single_line_delim)
613 vpp_output = "".join(cls.vpp_stdout_deque)
614 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
616 stdout_log('\n%s', vpp_output)
617 stdout_log(single_line_delim)
619 if hasattr(cls, 'vpp_stderr_deque'):
620 stderr_log(single_line_delim)
621 stderr_log('VPP output to stderr while running %s:', cls.__name__)
622 stderr_log(single_line_delim)
623 vpp_output = "".join(cls.vpp_stderr_deque)
624 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
626 stderr_log('\n%s', vpp_output)
627 stderr_log(single_line_delim)
630 def tearDownClass(cls):
631 """ Perform final cleanup after running all tests in this test-case """
632 cls.logger.debug("--- tearDownClass() for %s called ---" %
634 cls.reporter.send_keep_alive(cls, 'tearDownClass')
636 cls.file_handler.close()
637 cls.reset_packet_infos()
639 debug_internal.on_tear_down_class(cls)
641 def show_commands_at_teardown(self):
642 """ Allow subclass specific teardown logging additions."""
643 self.logger.info("--- No test specific show commands provided. ---")
646 """ Show various debug prints after each test """
647 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
648 (self.__class__.__name__, self._testMethodName,
649 self._testMethodDoc))
652 if not self.vpp_dead:
653 self.logger.debug(self.vapi.cli("show trace max 1000"))
654 self.logger.info(self.vapi.ppcli("show interface"))
655 self.logger.info(self.vapi.ppcli("show hardware"))
656 self.logger.info(self.statistics.set_errors_str())
657 self.logger.info(self.vapi.ppcli("show run"))
658 self.logger.info(self.vapi.ppcli("show log"))
659 self.logger.info("Logging testcase specific show commands.")
660 self.show_commands_at_teardown()
661 self.registry.remove_vpp_config(self.logger)
662 # Save/Dump VPP api trace log
663 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
664 tmp_api_trace = "/tmp/%s" % api_trace
665 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
666 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
667 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
669 os.rename(tmp_api_trace, vpp_api_trace_log)
670 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
672 except VppTransportShmemIOError:
673 self.logger.debug("VppTransportShmemIOError: Vpp dead. "
674 "Cannot log show commands.")
677 self.registry.unregister_all(self.logger)
680 """ Clear trace before running each test"""
681 super(VppTestCase, self).setUp()
682 self.reporter.send_keep_alive(self)
684 raise VppDiedError(self.__class__.__name__, self._testMethodName,
685 "VPP is dead when setting up the test "
686 "(%s.%s)." % (self.__class__.__name__,
687 self._testMethodName))
688 self.sleep(.1, "during setUp")
689 self.vpp_stdout_deque.append(
690 "--- test setUp() for %s.%s(%s) starts here ---\n" %
691 (self.__class__.__name__, self._testMethodName,
692 self._testMethodDoc))
693 self.vpp_stderr_deque.append(
694 "--- test setUp() for %s.%s(%s) starts here ---\n" %
695 (self.__class__.__name__, self._testMethodName,
696 self._testMethodDoc))
697 self.vapi.cli("clear trace")
698 # store the test instance inside the test class - so that objects
699 # holding the class can access instance methods (like assertEqual)
700 type(self).test_instance = self
703 def pg_enable_capture(cls, interfaces=None):
705 Enable capture on packet-generator interfaces
707 :param interfaces: iterable interface indexes (if None,
708 use self.pg_interfaces)
711 if interfaces is None:
712 interfaces = cls.pg_interfaces
717 def register_capture(cls, cap_name):
718 """ Register a capture in the testclass """
719 # add to the list of captures with current timestamp
720 cls._captures.append((time.time(), cap_name))
721 # filter out from zombies
722 cls._zombie_captures = [(stamp, name)
723 for (stamp, name) in cls._zombie_captures
728 """ Remove any zombie captures and enable the packet generator """
729 # how long before capture is allowed to be deleted - otherwise vpp
730 # crashes - 100ms seems enough (this shouldn't be needed at all)
733 for stamp, cap_name in cls._zombie_captures:
734 wait = stamp + capture_ttl - now
736 cls.sleep(wait, "before deleting capture %s" % cap_name)
738 cls.logger.debug("Removing zombie capture %s" % cap_name)
739 cls.vapi.cli('packet-generator delete %s' % cap_name)
741 cls.vapi.cli("trace add pg-input 1000")
742 cls.vapi.cli('packet-generator enable')
743 cls._zombie_captures = cls._captures
747 def create_pg_interfaces(cls, interfaces):
749 Create packet-generator interfaces.
751 :param interfaces: iterable indexes of the interfaces.
752 :returns: List of created interfaces.
757 intf = VppPGInterface(cls, i)
758 setattr(cls, intf.name, intf)
760 cls.pg_interfaces = result
764 def create_loopback_interfaces(cls, count):
766 Create loopback interfaces.
768 :param count: number of interfaces created.
769 :returns: List of created interfaces.
771 result = [VppLoInterface(cls) for i in range(count)]
773 setattr(cls, intf.name, intf)
774 cls.lo_interfaces = result
778 def create_bvi_interfaces(cls, count):
780 Create BVI interfaces.
782 :param count: number of interfaces created.
783 :returns: List of created interfaces.
785 result = [VppBviInterface(cls) for i in range(count)]
787 setattr(cls, intf.name, intf)
788 cls.bvi_interfaces = result
792 def extend_packet(packet, size, padding=' '):
794 Extend packet to given size by padding with spaces or custom padding
795 NOTE: Currently works only when Raw layer is present.
797 :param packet: packet
798 :param size: target size
799 :param padding: padding used to extend the payload
802 packet_len = len(packet) + 4
803 extend = size - packet_len
805 num = (extend // len(padding)) + 1
806 packet[Raw].load += (padding * num)[:extend].encode("ascii")
809 def reset_packet_infos(cls):
810 """ Reset the list of packet info objects and packet counts to zero """
811 cls._packet_infos = {}
812 cls._packet_count_for_dst_if_idx = {}
815 def create_packet_info(cls, src_if, dst_if):
817 Create packet info object containing the source and destination indexes
818 and add it to the testcase's packet info list
820 :param VppInterface src_if: source interface
821 :param VppInterface dst_if: destination interface
823 :returns: _PacketInfo object
827 info.index = len(cls._packet_infos)
828 info.src = src_if.sw_if_index
829 info.dst = dst_if.sw_if_index
830 if isinstance(dst_if, VppSubInterface):
831 dst_idx = dst_if.parent.sw_if_index
833 dst_idx = dst_if.sw_if_index
834 if dst_idx in cls._packet_count_for_dst_if_idx:
835 cls._packet_count_for_dst_if_idx[dst_idx] += 1
837 cls._packet_count_for_dst_if_idx[dst_idx] = 1
838 cls._packet_infos[info.index] = info
842 def info_to_payload(info):
844 Convert _PacketInfo object to packet payload
846 :param info: _PacketInfo object
848 :returns: string containing serialized data from packet info
850 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
854 def payload_to_info(payload, payload_field='load'):
856 Convert packet payload to _PacketInfo object
858 :param payload: packet payload
859 :type payload: <class 'scapy.packet.Raw'>
860 :param payload_field: packet fieldname of payload "load" for
861 <class 'scapy.packet.Raw'>
862 :type payload_field: str
863 :returns: _PacketInfo object containing de-serialized data from payload
866 numbers = getattr(payload, payload_field).split()
868 info.index = int(numbers[0])
869 info.src = int(numbers[1])
870 info.dst = int(numbers[2])
871 info.ip = int(numbers[3])
872 info.proto = int(numbers[4])
875 def get_next_packet_info(self, info):
877 Iterate over the packet info list stored in the testcase
878 Start iteration with first element if info is None
879 Continue based on index in info if info is specified
881 :param info: info or None
882 :returns: next info in list or None if no more infos
887 next_index = info.index + 1
888 if next_index == len(self._packet_infos):
891 return self._packet_infos[next_index]
893 def get_next_packet_info_for_interface(self, src_index, info):
895 Search the packet info list for the next packet info with same source
898 :param src_index: source interface index to search for
899 :param info: packet info - where to start the search
900 :returns: packet info or None
904 info = self.get_next_packet_info(info)
907 if info.src == src_index:
910 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
912 Search the packet info list for the next packet info with same source
913 and destination interface indexes
915 :param src_index: source interface index to search for
916 :param dst_index: destination interface index to search for
917 :param info: packet info - where to start the search
918 :returns: packet info or None
922 info = self.get_next_packet_info_for_interface(src_index, info)
925 if info.dst == dst_index:
928 def assert_equal(self, real_value, expected_value, name_or_class=None):
929 if name_or_class is None:
930 self.assertEqual(real_value, expected_value)
933 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
934 msg = msg % (getdoc(name_or_class).strip(),
935 real_value, str(name_or_class(real_value)),
936 expected_value, str(name_or_class(expected_value)))
938 msg = "Invalid %s: %s does not match expected value %s" % (
939 name_or_class, real_value, expected_value)
941 self.assertEqual(real_value, expected_value, msg)
943 def assert_in_range(self,
951 msg = "Invalid %s: %s out of range <%s,%s>" % (
952 name, real_value, expected_min, expected_max)
953 self.assertTrue(expected_min <= real_value <= expected_max, msg)
955 def assert_packet_checksums_valid(self, packet,
956 ignore_zero_udp_checksums=True):
957 received = packet.__class__(scapy.compat.raw(packet))
959 ppp("Verifying packet checksums for packet:", received))
960 udp_layers = ['UDP', 'UDPerror']
961 checksum_fields = ['cksum', 'chksum']
964 temp = received.__class__(scapy.compat.raw(received))
966 layer = temp.getlayer(counter)
968 for cf in checksum_fields:
969 if hasattr(layer, cf):
970 if ignore_zero_udp_checksums and \
971 0 == getattr(layer, cf) and \
972 layer.name in udp_layers:
975 checksums.append((counter, cf))
978 counter = counter + 1
979 if 0 == len(checksums):
981 temp = temp.__class__(scapy.compat.raw(temp))
982 for layer, cf in checksums:
983 calc_sum = getattr(temp[layer], cf)
985 getattr(received[layer], cf), calc_sum,
986 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
988 "Checksum field `%s` on `%s` layer has correct value `%s`" %
989 (cf, temp[layer].name, calc_sum))
991 def assert_checksum_valid(self, received_packet, layer,
993 ignore_zero_checksum=False):
994 """ Check checksum of received packet on given layer """
995 received_packet_checksum = getattr(received_packet[layer], field_name)
996 if ignore_zero_checksum and 0 == received_packet_checksum:
998 recalculated = received_packet.__class__(
999 scapy.compat.raw(received_packet))
1000 delattr(recalculated[layer], field_name)
1001 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1002 self.assert_equal(received_packet_checksum,
1003 getattr(recalculated[layer], field_name),
1004 "packet checksum on layer: %s" % layer)
1006 def assert_ip_checksum_valid(self, received_packet,
1007 ignore_zero_checksum=False):
1008 self.assert_checksum_valid(received_packet, 'IP',
1009 ignore_zero_checksum=ignore_zero_checksum)
1011 def assert_tcp_checksum_valid(self, received_packet,
1012 ignore_zero_checksum=False):
1013 self.assert_checksum_valid(received_packet, 'TCP',
1014 ignore_zero_checksum=ignore_zero_checksum)
1016 def assert_udp_checksum_valid(self, received_packet,
1017 ignore_zero_checksum=True):
1018 self.assert_checksum_valid(received_packet, 'UDP',
1019 ignore_zero_checksum=ignore_zero_checksum)
1021 def assert_embedded_icmp_checksum_valid(self, received_packet):
1022 if received_packet.haslayer(IPerror):
1023 self.assert_checksum_valid(received_packet, 'IPerror')
1024 if received_packet.haslayer(TCPerror):
1025 self.assert_checksum_valid(received_packet, 'TCPerror')
1026 if received_packet.haslayer(UDPerror):
1027 self.assert_checksum_valid(received_packet, 'UDPerror',
1028 ignore_zero_checksum=True)
1029 if received_packet.haslayer(ICMPerror):
1030 self.assert_checksum_valid(received_packet, 'ICMPerror')
1032 def assert_icmp_checksum_valid(self, received_packet):
1033 self.assert_checksum_valid(received_packet, 'ICMP')
1034 self.assert_embedded_icmp_checksum_valid(received_packet)
1036 def assert_icmpv6_checksum_valid(self, pkt):
1037 if pkt.haslayer(ICMPv6DestUnreach):
1038 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
1039 self.assert_embedded_icmp_checksum_valid(pkt)
1040 if pkt.haslayer(ICMPv6EchoRequest):
1041 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
1042 if pkt.haslayer(ICMPv6EchoReply):
1043 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
1045 def get_packet_counter(self, counter):
1046 if counter.startswith("/"):
1047 counter_value = self.statistics.get_counter(counter)
1049 counters = self.vapi.cli("sh errors").split('\n')
1051 for i in range(1, len(counters) - 1):
1052 results = counters[i].split()
1053 if results[1] == counter:
1054 counter_value = int(results[0])
1056 return counter_value
1058 def assert_packet_counter_equal(self, counter, expected_value):
1059 counter_value = self.get_packet_counter(counter)
1060 self.assert_equal(counter_value, expected_value,
1061 "packet counter `%s'" % counter)
1063 def assert_error_counter_equal(self, counter, expected_value):
1064 counter_value = self.statistics.get_err_counter(counter)
1065 self.assert_equal(counter_value, expected_value,
1066 "error counter `%s'" % counter)
1069 def sleep(cls, timeout, remark=None):
1071 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1072 # * by Guido, only the main thread can be interrupted.
1074 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1077 if hasattr(os, 'sched_yield'):
1083 if hasattr(cls, 'logger'):
1084 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1085 before = time.time()
1088 if hasattr(cls, 'logger') and after - before > 2 * timeout:
1089 cls.logger.error("unexpected self.sleep() result - "
1090 "slept for %es instead of ~%es!",
1091 after - before, timeout)
1092 if hasattr(cls, 'logger'):
1094 "Finished sleep (%s) - slept %es (wanted %es)",
1095 remark, after - before, timeout)
1097 def pg_send(self, intf, pkts):
1098 self.vapi.cli("clear trace")
1099 intf.add_stream(pkts)
1100 self.pg_enable_capture(self.pg_interfaces)
1103 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
1104 self.pg_send(intf, pkts)
1107 for i in self.pg_interfaces:
1108 i.get_capture(0, timeout=timeout)
1109 i.assert_nothing_captured(remark=remark)
1112 def send_and_expect(self, intf, pkts, output, n_rx=None):
1115 self.pg_send(intf, pkts)
1116 rx = output.get_capture(n_rx)
1119 def send_and_expect_only(self, intf, pkts, output, timeout=None):
1120 self.pg_send(intf, pkts)
1121 rx = output.get_capture(len(pkts))
1125 for i in self.pg_interfaces:
1126 if i not in outputs:
1127 i.get_capture(0, timeout=timeout)
1128 i.assert_nothing_captured()
1134 """ unittest calls runTest when TestCase is instantiated without a
1135 test case. Use case: Writing unittests against VppTestCase"""
1139 def get_testcase_doc_name(test):
1140 return getdoc(test.__class__).splitlines()[0]
1143 def get_test_description(descriptions, test):
1144 short_description = test.shortDescription()
1145 if descriptions and short_description:
1146 return short_description
1151 class TestCaseInfo(object):
1152 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1153 self.logger = logger
1154 self.tempdir = tempdir
1155 self.vpp_pid = vpp_pid
1156 self.vpp_bin_path = vpp_bin_path
1157 self.core_crash_test = None
1160 class VppTestResult(unittest.TestResult):
1162 @property result_string
1163 String variable to store the test case result string.
1165 List variable containing 2-tuples of TestCase instances and strings
1166 holding formatted tracebacks. Each tuple represents a test which
1167 raised an unexpected exception.
1169 List variable containing 2-tuples of TestCase instances and strings
1170 holding formatted tracebacks. Each tuple represents a test where
1171 a failure was explicitly signalled using the TestCase.assert*()
1175 failed_test_cases_info = set()
1176 core_crash_test_cases_info = set()
1177 current_test_case_info = None
1179 def __init__(self, stream=None, descriptions=None, verbosity=None,
1182 :param stream File descriptor to store where to report test results.
1183 Set to the standard error stream by default.
1184 :param descriptions Boolean variable to store information if to use
1185 test case descriptions.
1186 :param verbosity Integer variable to store required verbosity level.
1188 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1189 self.stream = stream
1190 self.descriptions = descriptions
1191 self.verbosity = verbosity
1192 self.result_string = None
1193 self.runner = runner
1195 def addSuccess(self, test):
1197 Record a test succeeded result
1202 if self.current_test_case_info:
1203 self.current_test_case_info.logger.debug(
1204 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1205 test._testMethodName,
1206 test._testMethodDoc))
1207 unittest.TestResult.addSuccess(self, test)
1208 self.result_string = colorize("OK", GREEN)
1210 self.send_result_through_pipe(test, PASS)
1212 def addSkip(self, test, reason):
1214 Record a test skipped.
1220 if self.current_test_case_info:
1221 self.current_test_case_info.logger.debug(
1222 "--- addSkip() %s.%s(%s) called, reason is %s" %
1223 (test.__class__.__name__, test._testMethodName,
1224 test._testMethodDoc, reason))
1225 unittest.TestResult.addSkip(self, test, reason)
1226 self.result_string = colorize("SKIP", YELLOW)
1228 self.send_result_through_pipe(test, SKIP)
1230 def symlink_failed(self):
1231 if self.current_test_case_info:
1233 failed_dir = os.getenv('FAILED_DIR')
1234 link_path = os.path.join(
1237 os.path.basename(self.current_test_case_info.tempdir))
1238 if self.current_test_case_info.logger:
1239 self.current_test_case_info.logger.debug(
1240 "creating a link to the failed test")
1241 self.current_test_case_info.logger.debug(
1242 "os.symlink(%s, %s)" %
1243 (self.current_test_case_info.tempdir, link_path))
1244 if os.path.exists(link_path):
1245 if self.current_test_case_info.logger:
1246 self.current_test_case_info.logger.debug(
1247 'symlink already exists')
1249 os.symlink(self.current_test_case_info.tempdir, link_path)
1251 except Exception as e:
1252 if self.current_test_case_info.logger:
1253 self.current_test_case_info.logger.error(e)
1255 def send_result_through_pipe(self, test, result):
1256 if hasattr(self, 'test_framework_result_pipe'):
1257 pipe = self.test_framework_result_pipe
1259 pipe.send((test.id(), result))
1261 def log_error(self, test, err, fn_name):
1262 if self.current_test_case_info:
1263 if isinstance(test, unittest.suite._ErrorHolder):
1264 test_name = test.description
1266 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1267 test._testMethodName,
1268 test._testMethodDoc)
1269 self.current_test_case_info.logger.debug(
1270 "--- %s() %s called, err is %s" %
1271 (fn_name, test_name, err))
1272 self.current_test_case_info.logger.debug(
1273 "formatted exception is:\n%s" %
1274 "".join(format_exception(*err)))
1276 def add_error(self, test, err, unittest_fn, error_type):
1277 if error_type == FAIL:
1278 self.log_error(test, err, 'addFailure')
1279 error_type_str = colorize("FAIL", RED)
1280 elif error_type == ERROR:
1281 self.log_error(test, err, 'addError')
1282 error_type_str = colorize("ERROR", RED)
1284 raise Exception('Error type %s cannot be used to record an '
1285 'error or a failure' % error_type)
1287 unittest_fn(self, test, err)
1288 if self.current_test_case_info:
1289 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1291 self.current_test_case_info.tempdir)
1292 self.symlink_failed()
1293 self.failed_test_cases_info.add(self.current_test_case_info)
1294 if is_core_present(self.current_test_case_info.tempdir):
1295 if not self.current_test_case_info.core_crash_test:
1296 if isinstance(test, unittest.suite._ErrorHolder):
1297 test_name = str(test)
1299 test_name = "'{!s}' ({!s})".format(
1300 get_testcase_doc_name(test), test.id())
1301 self.current_test_case_info.core_crash_test = test_name
1302 self.core_crash_test_cases_info.add(
1303 self.current_test_case_info)
1305 self.result_string = '%s [no temp dir]' % error_type_str
1307 self.send_result_through_pipe(test, error_type)
1309 def addFailure(self, test, err):
1311 Record a test failed result
1314 :param err: error message
1317 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1319 def addError(self, test, err):
1321 Record a test error result
1324 :param err: error message
1327 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1329 def getDescription(self, test):
1331 Get test description
1334 :returns: test description
1337 return get_test_description(self.descriptions, test)
1339 def startTest(self, test):
1347 def print_header(test):
1348 if not hasattr(test.__class__, '_header_printed'):
1349 print(double_line_delim)
1350 print(colorize(getdoc(test).splitlines()[0], GREEN))
1351 print(double_line_delim)
1352 test.__class__._header_printed = True
1356 unittest.TestResult.startTest(self, test)
1357 if self.verbosity > 0:
1358 self.stream.writeln(
1359 "Starting " + self.getDescription(test) + " ...")
1360 self.stream.writeln(single_line_delim)
1362 def stopTest(self, test):
1364 Called when the given test has been run
1369 unittest.TestResult.stopTest(self, test)
1370 if self.verbosity > 0:
1371 self.stream.writeln(single_line_delim)
1372 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1373 self.result_string))
1374 self.stream.writeln(single_line_delim)
1376 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1377 self.result_string))
1379 self.send_result_through_pipe(test, TEST_RUN)
1381 def printErrors(self):
1383 Print errors from running the test case
1385 if len(self.errors) > 0 or len(self.failures) > 0:
1386 self.stream.writeln()
1387 self.printErrorList('ERROR', self.errors)
1388 self.printErrorList('FAIL', self.failures)
1390 # ^^ that is the last output from unittest before summary
1391 if not self.runner.print_summary:
1392 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1393 self.stream = devnull
1394 self.runner.stream = devnull
1396 def printErrorList(self, flavour, errors):
1398 Print error list to the output stream together with error type
1399 and test case description.
1401 :param flavour: error type
1402 :param errors: iterable errors
1405 for test, err in errors:
1406 self.stream.writeln(double_line_delim)
1407 self.stream.writeln("%s: %s" %
1408 (flavour, self.getDescription(test)))
1409 self.stream.writeln(single_line_delim)
1410 self.stream.writeln("%s" % err)
1413 class VppTestRunner(unittest.TextTestRunner):
1415 A basic test runner implementation which prints results to standard error.
1419 def resultclass(self):
1420 """Class maintaining the results of the tests"""
1421 return VppTestResult
1423 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1424 result_pipe=None, failfast=False, buffer=False,
1425 resultclass=None, print_summary=True, **kwargs):
1426 # ignore stream setting here, use hard-coded stdout to be in sync
1427 # with prints from VppTestCase methods ...
1428 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1429 verbosity, failfast, buffer,
1430 resultclass, **kwargs)
1431 KeepAliveReporter.pipe = keep_alive_pipe
1433 self.orig_stream = self.stream
1434 self.resultclass.test_framework_result_pipe = result_pipe
1436 self.print_summary = print_summary
1438 def _makeResult(self):
1439 return self.resultclass(self.stream,
1444 def run(self, test):
1451 faulthandler.enable() # emit stack trace to stderr if killed by signal
1453 result = super(VppTestRunner, self).run(test)
1454 if not self.print_summary:
1455 self.stream = self.orig_stream
1456 result.stream = self.orig_stream
1460 class Worker(Thread):
1461 def __init__(self, args, logger, env={}):
1462 self.logger = logger
1465 self.env = copy.deepcopy(env)
1466 super(Worker, self).__init__()
1469 executable = self.args[0]
1470 self.logger.debug("Running executable w/args `%s'" % self.args)
1471 env = os.environ.copy()
1472 env.update(self.env)
1473 env["CK_LOG_FILE_NAME"] = "-"
1474 self.process = subprocess.Popen(
1475 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1476 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1477 out, err = self.process.communicate()
1478 self.logger.debug("Finished running `%s'" % executable)
1479 self.logger.info("Return code is `%s'" % self.process.returncode)
1480 self.logger.info(single_line_delim)
1481 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1482 self.logger.info(single_line_delim)
1483 self.logger.info(out)
1484 self.logger.info(single_line_delim)
1485 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1486 self.logger.info(single_line_delim)
1487 self.logger.info(err)
1488 self.logger.info(single_line_delim)
1489 self.result = self.process.returncode
1492 if __name__ == '__main__':