3 from __future__ import print_function
17 from collections import deque
18 from threading import Thread, Event
19 from inspect import getdoc, isclass
20 from traceback import format_exception
21 from logging import FileHandler, DEBUG, Formatter
24 from scapy.packet import Raw
25 import hook as hookmodule
26 from vpp_pg_interface import VppPGInterface
27 from vpp_sub_interface import VppSubInterface
28 from vpp_lo_interface import VppLoInterface
29 from vpp_bvi_interface import VppBviInterface
30 from vpp_papi_provider import VppPapiProvider
31 from vpp_papi.vpp_stats import VPPStats
32 from vpp_papi.vpp_transport_shmem import VppTransportShmemIOError
33 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
35 from vpp_object import VppObjectRegistry
36 from util import ppp, is_core_present
37 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
38 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
39 from scapy.layers.inet6 import ICMPv6EchoReply
41 if os.name == 'posix' and sys.version_info[0] < 3:
42 # using subprocess32 is recommended by python official documentation
43 # @ https://docs.python.org/2/library/subprocess.html
44 import subprocess32 as subprocess
48 # Python2/3 compatible
60 debug_framework = False
61 if os.getenv('TEST_DEBUG', "0") == "1":
62 debug_framework = True
66 Test framework module.
68 The module provides a set of tools for constructing and running tests and
69 representing the results.
73 class VppDiedError(Exception):
74 """ exception for reporting that the subprocess has died."""
76 signals_by_value = {v: k for k, v in signal.__dict__.items() if
77 k.startswith('SIG') and not k.startswith('SIG_')}
79 def __init__(self, rv=None):
81 self.signal_name = None
83 self.signal_name = VppDiedError.signals_by_value[-rv]
87 msg = "VPP subprocess died unexpectedly with return code: %d%s." % (
89 ' [%s]' % self.signal_name if
90 self.signal_name is not None else '')
91 super(VppDiedError, self).__init__(msg)
94 class _PacketInfo(object):
95 """Private class to create packet info object.
97 Help process information about the next packet.
98 Set variables to default values.
100 #: Store the index of the packet.
102 #: Store the index of the source packet generator interface of the packet.
104 #: Store the index of the destination packet generator interface
107 #: Store expected ip version
109 #: Store expected upper protocol
111 #: Store the copy of the former packet.
114 def __eq__(self, other):
115 index = self.index == other.index
116 src = self.src == other.src
117 dst = self.dst == other.dst
118 data = self.data == other.data
119 return index and src and dst and data
122 def pump_output(testclass):
123 """ pump output from vpp stdout/stderr to proper queues """
126 while not testclass.pump_thread_stop_flag.is_set():
127 readable = select.select([testclass.vpp.stdout.fileno(),
128 testclass.vpp.stderr.fileno(),
129 testclass.pump_thread_wakeup_pipe[0]],
131 if testclass.vpp.stdout.fileno() in readable:
132 read = os.read(testclass.vpp.stdout.fileno(), 102400)
134 split = read.splitlines(True)
135 if len(stdout_fragment) > 0:
136 split[0] = "%s%s" % (stdout_fragment, split[0])
137 if len(split) > 0 and split[-1].endswith("\n"):
141 stdout_fragment = split[-1]
142 testclass.vpp_stdout_deque.extend(split[:limit])
143 if not testclass.cache_vpp_output:
144 for line in split[:limit]:
145 testclass.logger.debug(
146 "VPP STDOUT: %s" % line.rstrip("\n"))
147 if testclass.vpp.stderr.fileno() in readable:
148 read = os.read(testclass.vpp.stderr.fileno(), 102400)
150 split = read.splitlines(True)
151 if len(stderr_fragment) > 0:
152 split[0] = "%s%s" % (stderr_fragment, split[0])
153 if len(split) > 0 and split[-1].endswith(b"\n"):
157 stderr_fragment = split[-1]
158 testclass.vpp_stderr_deque.extend(split[:limit])
159 if not testclass.cache_vpp_output:
160 for line in split[:limit]:
161 testclass.logger.debug(
162 "VPP STDERR: %s" % line.rstrip("\n"))
163 # ignoring the dummy pipe here intentionally - the
164 # flag will take care of properly terminating the loop
167 def _is_skip_aarch64_set():
168 return os.getenv('SKIP_AARCH64', 'n').lower() in ('yes', 'y', '1')
171 is_skip_aarch64_set = _is_skip_aarch64_set()
174 def _is_platform_aarch64():
175 return platform.machine() == 'aarch64'
178 is_platform_aarch64 = _is_platform_aarch64()
181 def _running_extended_tests():
182 s = os.getenv("EXTENDED_TESTS", "n")
183 return True if s.lower() in ("y", "yes", "1") else False
186 running_extended_tests = _running_extended_tests()
189 def _running_on_centos():
190 os_id = os.getenv("OS_ID", "")
191 return True if "centos" in os_id.lower() else False
194 running_on_centos = _running_on_centos
197 class KeepAliveReporter(object):
199 Singleton object which reports test start to parent process
204 self.__dict__ = self._shared_state
212 def pipe(self, pipe):
213 if self._pipe is not None:
214 raise Exception("Internal error - pipe should only be set once.")
217 def send_keep_alive(self, test, desc=None):
219 Write current test tmpdir & desc to keep-alive pipe to signal liveness
221 if self.pipe is None:
222 # if not running forked..
226 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
230 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
233 class VppTestCase(unittest.TestCase):
234 """This subclass is a base class for VPP test cases that are implemented as
235 classes. It provides methods to create and run test case.
238 extra_vpp_punt_config = []
239 extra_vpp_plugin_config = []
242 def packet_infos(self):
243 """List of packet infos"""
244 return self._packet_infos
247 def get_packet_count_for_if_idx(cls, dst_if_index):
248 """Get the number of packet info for specified destination if index"""
249 if dst_if_index in cls._packet_count_for_dst_if_idx:
250 return cls._packet_count_for_dst_if_idx[dst_if_index]
256 """Return the instance of this testcase"""
257 return cls.test_instance
260 def set_debug_flags(cls, d):
261 cls.debug_core = False
262 cls.debug_gdb = False
263 cls.debug_gdbserver = False
268 cls.debug_core = True
271 elif dl == "gdbserver":
272 cls.debug_gdbserver = True
274 raise Exception("Unrecognized DEBUG option: '%s'" % d)
277 def get_least_used_cpu():
278 cpu_usage_list = [set(range(psutil.cpu_count()))]
279 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
280 if 'vpp_main' == p.info['name']]
281 for vpp_process in vpp_processes:
282 for cpu_usage_set in cpu_usage_list:
284 cpu_num = vpp_process.cpu_num()
285 if cpu_num in cpu_usage_set:
286 cpu_usage_set_index = cpu_usage_list.index(
288 if cpu_usage_set_index == len(cpu_usage_list) - 1:
289 cpu_usage_list.append({cpu_num})
291 cpu_usage_list[cpu_usage_set_index + 1].add(
293 cpu_usage_set.remove(cpu_num)
295 except psutil.NoSuchProcess:
298 for cpu_usage_set in cpu_usage_list:
299 if len(cpu_usage_set) > 0:
300 min_usage_set = cpu_usage_set
303 return random.choice(tuple(min_usage_set))
306 def setUpConstants(cls):
307 """ Set-up the test case class based on environment variables """
308 s = os.getenv("STEP", "n")
309 cls.step = True if s.lower() in ("y", "yes", "1") else False
310 d = os.getenv("DEBUG", None)
311 c = os.getenv("CACHE_OUTPUT", "1")
312 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
313 cls.set_debug_flags(d)
314 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
315 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
316 cls.test_plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
317 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
319 if cls.plugin_path is not None:
320 if cls.extern_plugin_path is not None:
321 plugin_path = "%s:%s" % (
322 cls.plugin_path, cls.extern_plugin_path)
324 plugin_path = cls.plugin_path
325 elif cls.extern_plugin_path is not None:
326 plugin_path = cls.extern_plugin_path
328 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
329 debug_cli = "cli-listen localhost:5002"
331 size = os.getenv("COREDUMP_SIZE")
333 coredump_size = "coredump-size %s" % size
334 if coredump_size is None:
335 coredump_size = "coredump-size unlimited"
337 cpu_core_number = cls.get_least_used_cpu()
339 cls.vpp_cmdline = [cls.vpp_bin, "unix",
340 "{", "nodaemon", debug_cli, "full-coredump",
341 coredump_size, "runtime-dir", cls.tempdir, "}",
342 "api-trace", "{", "on", "}", "api-segment", "{",
343 "prefix", cls.shm_prefix, "}", "cpu", "{",
344 "main-core", str(cpu_core_number), "}",
345 "statseg", "{", "socket-name", cls.stats_sock, "}",
346 "socksvr", "{", "socket-name", cls.api_sock, "}",
348 "{", "plugin", "dpdk_plugin.so", "{", "disable",
349 "}", "plugin", "rdma_plugin.so", "{", "disable",
350 "}", "plugin", "unittest_plugin.so", "{", "enable",
351 "}"] + cls.extra_vpp_plugin_config + ["}", ]
352 if cls.extra_vpp_punt_config is not None:
353 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
354 if plugin_path is not None:
355 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
356 if cls.test_plugin_path is not None:
357 cls.vpp_cmdline.extend(["test_plugin_path", cls.test_plugin_path])
359 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
360 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
363 def wait_for_enter(cls):
364 if cls.debug_gdbserver:
365 print(double_line_delim)
366 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
368 print(double_line_delim)
369 print("Spawned VPP with PID: %d" % cls.vpp.pid)
371 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
373 print(single_line_delim)
374 print("You can debug the VPP using e.g.:")
375 if cls.debug_gdbserver:
376 print("sudo gdb " + cls.vpp_bin +
377 " -ex 'target remote localhost:7777'")
378 print("Now is the time to attach a gdb by running the above "
379 "command, set up breakpoints etc. and then resume VPP from "
380 "within gdb by issuing the 'continue' command")
382 print("sudo gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
383 print("Now is the time to attach a gdb by running the above "
384 "command and set up breakpoints etc.")
385 print(single_line_delim)
386 input("Press ENTER to continue running the testcase...")
390 cmdline = cls.vpp_cmdline
392 if cls.debug_gdbserver:
393 gdbserver = '/usr/bin/gdbserver'
394 if not os.path.isfile(gdbserver) or \
395 not os.access(gdbserver, os.X_OK):
396 raise Exception("gdbserver binary '%s' does not exist or is "
397 "not executable" % gdbserver)
399 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
400 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
403 cls.vpp = subprocess.Popen(cmdline,
404 stdout=subprocess.PIPE,
405 stderr=subprocess.PIPE,
407 except subprocess.CalledProcessError as e:
408 cls.logger.critical("Subprocess returned with non-0 return code: ("
412 cls.logger.critical("Subprocess returned with OS error: "
413 "(%s) %s", e.errno, e.strerror)
415 except Exception as e:
416 cls.logger.exception("Subprocess returned unexpected from "
423 def wait_for_stats_socket(cls):
424 deadline = time.time() + 3
426 while time.time() < deadline or \
427 cls.debug_gdb or cls.debug_gdbserver:
428 if os.path.exists(cls.stats_sock):
433 cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
436 def wait_for_coredump(cls):
437 corefile = cls.tempdir + "/core"
438 if os.path.isfile(corefile):
439 cls.logger.error("Waiting for coredump to complete: %s", corefile)
440 curr_size = os.path.getsize(corefile)
441 deadline = time.time() + 60
443 while time.time() < deadline:
446 curr_size = os.path.getsize(corefile)
447 if size == curr_size:
451 cls.logger.error("Timed out waiting for coredump to complete:"
454 cls.logger.error("Coredump complete: %s, size %d",
460 Perform class setup before running the testcase
461 Remove shared memory files, start vpp and connect the vpp-api
463 super(VppTestCase, cls).setUpClass()
464 gc.collect() # run garbage collection first
466 cls.logger = get_logger(cls.__name__)
467 if hasattr(cls, 'parallel_handler'):
468 cls.logger.addHandler(cls.parallel_handler)
469 cls.logger.propagate = False
471 cls.tempdir = tempfile.mkdtemp(
472 prefix='vpp-unittest-%s-' % cls.__name__)
473 cls.stats_sock = "%s/stats.sock" % cls.tempdir
474 cls.api_sock = "%s/api.sock" % cls.tempdir
475 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
476 cls.file_handler.setFormatter(
477 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
479 cls.file_handler.setLevel(DEBUG)
480 cls.logger.addHandler(cls.file_handler)
481 cls.logger.debug("--- setUpClass() for %s called ---" %
483 cls.shm_prefix = os.path.basename(cls.tempdir)
484 os.chdir(cls.tempdir)
485 cls.logger.info("Temporary dir is %s, shm prefix is %s",
486 cls.tempdir, cls.shm_prefix)
488 cls.reset_packet_infos()
490 cls._zombie_captures = []
493 cls.registry = VppObjectRegistry()
494 cls.vpp_startup_failed = False
495 cls.reporter = KeepAliveReporter()
496 # need to catch exceptions here because if we raise, then the cleanup
497 # doesn't get called and we might end with a zombie vpp
500 cls.reporter.send_keep_alive(cls, 'setUpClass')
501 VppTestResult.current_test_case_info = TestCaseInfo(
502 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
503 cls.vpp_stdout_deque = deque()
504 cls.vpp_stderr_deque = deque()
505 cls.pump_thread_stop_flag = Event()
506 cls.pump_thread_wakeup_pipe = os.pipe()
507 cls.pump_thread = Thread(target=pump_output, args=(cls,))
508 cls.pump_thread.daemon = True
509 cls.pump_thread.start()
510 if cls.debug_gdb or cls.debug_gdbserver:
514 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
517 hook = hookmodule.StepHook(cls)
519 hook = hookmodule.PollHook(cls)
520 cls.vapi.register_hook(hook)
521 cls.wait_for_stats_socket()
522 cls.statistics = VPPStats(socketname=cls.stats_sock)
526 cls.vpp_startup_failed = True
528 "VPP died shortly after startup, check the"
529 " output to standard error for possible cause")
535 cls.vapi.disconnect()
538 if cls.debug_gdbserver:
539 print(colorize("You're running VPP inside gdbserver but "
540 "VPP-API connection failed, did you forget "
541 "to 'continue' VPP from within gdb?", RED))
551 Disconnect vpp-api, kill vpp and cleanup shared memory files
553 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
555 if cls.vpp.returncode is None:
556 print(double_line_delim)
557 print("VPP or GDB server is still running")
558 print(single_line_delim)
559 input("When done debugging, press ENTER to kill the "
560 "process and finish running the testcase...")
562 # first signal that we want to stop the pump thread, then wake it up
563 if hasattr(cls, 'pump_thread_stop_flag'):
564 cls.pump_thread_stop_flag.set()
565 if hasattr(cls, 'pump_thread_wakeup_pipe'):
566 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
567 if hasattr(cls, 'pump_thread'):
568 cls.logger.debug("Waiting for pump thread to stop")
569 cls.pump_thread.join()
570 if hasattr(cls, 'vpp_stderr_reader_thread'):
571 cls.logger.debug("Waiting for stdderr pump to stop")
572 cls.vpp_stderr_reader_thread.join()
574 if hasattr(cls, 'vpp'):
575 if hasattr(cls, 'vapi'):
576 cls.logger.debug("Disconnecting class vapi client on %s",
578 cls.vapi.disconnect()
579 cls.logger.debug("Deleting class vapi attribute on %s",
583 if cls.vpp.returncode is None:
584 cls.wait_for_coredump()
585 cls.logger.debug("Sending TERM to vpp")
587 cls.logger.debug("Waiting for vpp to die")
588 cls.vpp.communicate()
589 cls.logger.debug("Deleting class vpp attribute on %s",
593 if cls.vpp_startup_failed:
594 stdout_log = cls.logger.info
595 stderr_log = cls.logger.critical
597 stdout_log = cls.logger.info
598 stderr_log = cls.logger.info
600 if hasattr(cls, 'vpp_stdout_deque'):
601 stdout_log(single_line_delim)
602 stdout_log('VPP output to stdout while running %s:', cls.__name__)
603 stdout_log(single_line_delim)
604 vpp_output = "".join(cls.vpp_stdout_deque)
605 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
607 stdout_log('\n%s', vpp_output)
608 stdout_log(single_line_delim)
610 if hasattr(cls, 'vpp_stderr_deque'):
611 stderr_log(single_line_delim)
612 stderr_log('VPP output to stderr while running %s:', cls.__name__)
613 stderr_log(single_line_delim)
614 vpp_output = "".join(cls.vpp_stderr_deque)
615 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
617 stderr_log('\n%s', vpp_output)
618 stderr_log(single_line_delim)
621 def tearDownClass(cls):
622 """ Perform final cleanup after running all tests in this test-case """
623 cls.logger.debug("--- tearDownClass() for %s called ---" %
625 cls.reporter.send_keep_alive(cls, 'tearDownClass')
627 cls.file_handler.close()
628 cls.reset_packet_infos()
630 debug_internal.on_tear_down_class(cls)
632 def show_commands_at_teardown(self):
633 """ Allow subclass specific teardown logging additions."""
634 self.logger.info("--- No test specific show commands provided. ---")
637 """ Show various debug prints after each test """
638 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
639 (self.__class__.__name__, self._testMethodName,
640 self._testMethodDoc))
643 if not self.vpp_dead:
644 self.logger.debug(self.vapi.cli("show trace max 1000"))
645 self.logger.info(self.vapi.ppcli("show interface"))
646 self.logger.info(self.vapi.ppcli("show hardware"))
647 self.logger.info(self.statistics.set_errors_str())
648 self.logger.info(self.vapi.ppcli("show run"))
649 self.logger.info(self.vapi.ppcli("show log"))
650 self.logger.info("Logging testcase specific show commands.")
651 self.show_commands_at_teardown()
652 self.registry.remove_vpp_config(self.logger)
653 # Save/Dump VPP api trace log
654 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
655 tmp_api_trace = "/tmp/%s" % api_trace
656 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
657 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
658 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
660 os.rename(tmp_api_trace, vpp_api_trace_log)
661 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
663 except VppTransportShmemIOError:
664 self.logger.debug("VppTransportShmemIOError: Vpp dead. "
665 "Cannot log show commands.")
668 self.registry.unregister_all(self.logger)
671 """ Clear trace before running each test"""
672 super(VppTestCase, self).setUp()
673 self.reporter.send_keep_alive(self)
675 raise Exception("VPP is dead when setting up the test")
676 self.sleep(.1, "during setUp")
677 self.vpp_stdout_deque.append(
678 "--- test setUp() for %s.%s(%s) starts here ---\n" %
679 (self.__class__.__name__, self._testMethodName,
680 self._testMethodDoc))
681 self.vpp_stderr_deque.append(
682 "--- test setUp() for %s.%s(%s) starts here ---\n" %
683 (self.__class__.__name__, self._testMethodName,
684 self._testMethodDoc))
685 self.vapi.cli("clear trace")
686 # store the test instance inside the test class - so that objects
687 # holding the class can access instance methods (like assertEqual)
688 type(self).test_instance = self
691 def pg_enable_capture(cls, interfaces=None):
693 Enable capture on packet-generator interfaces
695 :param interfaces: iterable interface indexes (if None,
696 use self.pg_interfaces)
699 if interfaces is None:
700 interfaces = cls.pg_interfaces
705 def register_capture(cls, cap_name):
706 """ Register a capture in the testclass """
707 # add to the list of captures with current timestamp
708 cls._captures.append((time.time(), cap_name))
709 # filter out from zombies
710 cls._zombie_captures = [(stamp, name)
711 for (stamp, name) in cls._zombie_captures
716 """ Remove any zombie captures and enable the packet generator """
717 # how long before capture is allowed to be deleted - otherwise vpp
718 # crashes - 100ms seems enough (this shouldn't be needed at all)
721 for stamp, cap_name in cls._zombie_captures:
722 wait = stamp + capture_ttl - now
724 cls.sleep(wait, "before deleting capture %s" % cap_name)
726 cls.logger.debug("Removing zombie capture %s" % cap_name)
727 cls.vapi.cli('packet-generator delete %s' % cap_name)
729 cls.vapi.cli("trace add pg-input 1000")
730 cls.vapi.cli('packet-generator enable')
731 cls._zombie_captures = cls._captures
735 def create_pg_interfaces(cls, interfaces):
737 Create packet-generator interfaces.
739 :param interfaces: iterable indexes of the interfaces.
740 :returns: List of created interfaces.
745 intf = VppPGInterface(cls, i)
746 setattr(cls, intf.name, intf)
748 cls.pg_interfaces = result
752 def create_loopback_interfaces(cls, count):
754 Create loopback interfaces.
756 :param count: number of interfaces created.
757 :returns: List of created interfaces.
759 result = [VppLoInterface(cls) for i in range(count)]
761 setattr(cls, intf.name, intf)
762 cls.lo_interfaces = result
766 def create_bvi_interfaces(cls, count):
768 Create BVI interfaces.
770 :param count: number of interfaces created.
771 :returns: List of created interfaces.
773 result = [VppBviInterface(cls) for i in range(count)]
775 setattr(cls, intf.name, intf)
776 cls.bvi_interfaces = result
780 def extend_packet(packet, size, padding=' '):
782 Extend packet to given size by padding with spaces or custom padding
783 NOTE: Currently works only when Raw layer is present.
785 :param packet: packet
786 :param size: target size
787 :param padding: padding used to extend the payload
790 packet_len = len(packet) + 4
791 extend = size - packet_len
793 num = (extend // len(padding)) + 1
794 packet[Raw].load += (padding * num)[:extend].encode("ascii")
797 def reset_packet_infos(cls):
798 """ Reset the list of packet info objects and packet counts to zero """
799 cls._packet_infos = {}
800 cls._packet_count_for_dst_if_idx = {}
803 def create_packet_info(cls, src_if, dst_if):
805 Create packet info object containing the source and destination indexes
806 and add it to the testcase's packet info list
808 :param VppInterface src_if: source interface
809 :param VppInterface dst_if: destination interface
811 :returns: _PacketInfo object
815 info.index = len(cls._packet_infos)
816 info.src = src_if.sw_if_index
817 info.dst = dst_if.sw_if_index
818 if isinstance(dst_if, VppSubInterface):
819 dst_idx = dst_if.parent.sw_if_index
821 dst_idx = dst_if.sw_if_index
822 if dst_idx in cls._packet_count_for_dst_if_idx:
823 cls._packet_count_for_dst_if_idx[dst_idx] += 1
825 cls._packet_count_for_dst_if_idx[dst_idx] = 1
826 cls._packet_infos[info.index] = info
830 def info_to_payload(info):
832 Convert _PacketInfo object to packet payload
834 :param info: _PacketInfo object
836 :returns: string containing serialized data from packet info
838 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
842 def payload_to_info(payload, payload_field='load'):
844 Convert packet payload to _PacketInfo object
846 :param payload: packet payload
847 :type payload: <class 'scapy.packet.Raw'>
848 :param payload_field: packet fieldname of payload "load" for
849 <class 'scapy.packet.Raw'>
850 :type payload_field: str
851 :returns: _PacketInfo object containing de-serialized data from payload
854 numbers = getattr(payload, payload_field).split()
856 info.index = int(numbers[0])
857 info.src = int(numbers[1])
858 info.dst = int(numbers[2])
859 info.ip = int(numbers[3])
860 info.proto = int(numbers[4])
863 def get_next_packet_info(self, info):
865 Iterate over the packet info list stored in the testcase
866 Start iteration with first element if info is None
867 Continue based on index in info if info is specified
869 :param info: info or None
870 :returns: next info in list or None if no more infos
875 next_index = info.index + 1
876 if next_index == len(self._packet_infos):
879 return self._packet_infos[next_index]
881 def get_next_packet_info_for_interface(self, src_index, info):
883 Search the packet info list for the next packet info with same source
886 :param src_index: source interface index to search for
887 :param info: packet info - where to start the search
888 :returns: packet info or None
892 info = self.get_next_packet_info(info)
895 if info.src == src_index:
898 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
900 Search the packet info list for the next packet info with same source
901 and destination interface indexes
903 :param src_index: source interface index to search for
904 :param dst_index: destination interface index to search for
905 :param info: packet info - where to start the search
906 :returns: packet info or None
910 info = self.get_next_packet_info_for_interface(src_index, info)
913 if info.dst == dst_index:
916 def assert_equal(self, real_value, expected_value, name_or_class=None):
917 if name_or_class is None:
918 self.assertEqual(real_value, expected_value)
921 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
922 msg = msg % (getdoc(name_or_class).strip(),
923 real_value, str(name_or_class(real_value)),
924 expected_value, str(name_or_class(expected_value)))
926 msg = "Invalid %s: %s does not match expected value %s" % (
927 name_or_class, real_value, expected_value)
929 self.assertEqual(real_value, expected_value, msg)
931 def assert_in_range(self,
939 msg = "Invalid %s: %s out of range <%s,%s>" % (
940 name, real_value, expected_min, expected_max)
941 self.assertTrue(expected_min <= real_value <= expected_max, msg)
943 def assert_packet_checksums_valid(self, packet,
944 ignore_zero_udp_checksums=True):
945 received = packet.__class__(scapy.compat.raw(packet))
947 ppp("Verifying packet checksums for packet:", received))
948 udp_layers = ['UDP', 'UDPerror']
949 checksum_fields = ['cksum', 'chksum']
952 temp = received.__class__(scapy.compat.raw(received))
954 layer = temp.getlayer(counter)
956 for cf in checksum_fields:
957 if hasattr(layer, cf):
958 if ignore_zero_udp_checksums and \
959 0 == getattr(layer, cf) and \
960 layer.name in udp_layers:
963 checksums.append((counter, cf))
966 counter = counter + 1
967 if 0 == len(checksums):
969 temp = temp.__class__(scapy.compat.raw(temp))
970 for layer, cf in checksums:
971 calc_sum = getattr(temp[layer], cf)
973 getattr(received[layer], cf), calc_sum,
974 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
976 "Checksum field `%s` on `%s` layer has correct value `%s`" %
977 (cf, temp[layer].name, calc_sum))
979 def assert_checksum_valid(self, received_packet, layer,
981 ignore_zero_checksum=False):
982 """ Check checksum of received packet on given layer """
983 received_packet_checksum = getattr(received_packet[layer], field_name)
984 if ignore_zero_checksum and 0 == received_packet_checksum:
986 recalculated = received_packet.__class__(
987 scapy.compat.raw(received_packet))
988 delattr(recalculated[layer], field_name)
989 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
990 self.assert_equal(received_packet_checksum,
991 getattr(recalculated[layer], field_name),
992 "packet checksum on layer: %s" % layer)
994 def assert_ip_checksum_valid(self, received_packet,
995 ignore_zero_checksum=False):
996 self.assert_checksum_valid(received_packet, 'IP',
997 ignore_zero_checksum=ignore_zero_checksum)
999 def assert_tcp_checksum_valid(self, received_packet,
1000 ignore_zero_checksum=False):
1001 self.assert_checksum_valid(received_packet, 'TCP',
1002 ignore_zero_checksum=ignore_zero_checksum)
1004 def assert_udp_checksum_valid(self, received_packet,
1005 ignore_zero_checksum=True):
1006 self.assert_checksum_valid(received_packet, 'UDP',
1007 ignore_zero_checksum=ignore_zero_checksum)
1009 def assert_embedded_icmp_checksum_valid(self, received_packet):
1010 if received_packet.haslayer(IPerror):
1011 self.assert_checksum_valid(received_packet, 'IPerror')
1012 if received_packet.haslayer(TCPerror):
1013 self.assert_checksum_valid(received_packet, 'TCPerror')
1014 if received_packet.haslayer(UDPerror):
1015 self.assert_checksum_valid(received_packet, 'UDPerror',
1016 ignore_zero_checksum=True)
1017 if received_packet.haslayer(ICMPerror):
1018 self.assert_checksum_valid(received_packet, 'ICMPerror')
1020 def assert_icmp_checksum_valid(self, received_packet):
1021 self.assert_checksum_valid(received_packet, 'ICMP')
1022 self.assert_embedded_icmp_checksum_valid(received_packet)
1024 def assert_icmpv6_checksum_valid(self, pkt):
1025 if pkt.haslayer(ICMPv6DestUnreach):
1026 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
1027 self.assert_embedded_icmp_checksum_valid(pkt)
1028 if pkt.haslayer(ICMPv6EchoRequest):
1029 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
1030 if pkt.haslayer(ICMPv6EchoReply):
1031 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
1033 def get_packet_counter(self, counter):
1034 if counter.startswith("/"):
1035 counter_value = self.statistics.get_counter(counter)
1037 counters = self.vapi.cli("sh errors").split('\n')
1039 for i in range(1, len(counters) - 1):
1040 results = counters[i].split()
1041 if results[1] == counter:
1042 counter_value = int(results[0])
1044 return counter_value
1046 def assert_packet_counter_equal(self, counter, expected_value):
1047 counter_value = self.get_packet_counter(counter)
1048 self.assert_equal(counter_value, expected_value,
1049 "packet counter `%s'" % counter)
1051 def assert_error_counter_equal(self, counter, expected_value):
1052 counter_value = self.statistics.get_err_counter(counter)
1053 self.assert_equal(counter_value, expected_value,
1054 "error counter `%s'" % counter)
1057 def sleep(cls, timeout, remark=None):
1059 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1060 # * by Guido, only the main thread can be interrupted.
1062 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1065 if hasattr(os, 'sched_yield'):
1071 if hasattr(cls, 'logger'):
1072 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1073 before = time.time()
1076 if hasattr(cls, 'logger') and after - before > 2 * timeout:
1077 cls.logger.error("unexpected self.sleep() result - "
1078 "slept for %es instead of ~%es!",
1079 after - before, timeout)
1080 if hasattr(cls, 'logger'):
1082 "Finished sleep (%s) - slept %es (wanted %es)",
1083 remark, after - before, timeout)
1085 def pg_send(self, intf, pkts):
1086 self.vapi.cli("clear trace")
1087 intf.add_stream(pkts)
1088 self.pg_enable_capture(self.pg_interfaces)
1091 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
1092 self.pg_send(intf, pkts)
1095 for i in self.pg_interfaces:
1096 i.get_capture(0, timeout=timeout)
1097 i.assert_nothing_captured(remark=remark)
1100 def send_and_expect(self, intf, pkts, output, n_rx=None):
1103 self.pg_send(intf, pkts)
1104 rx = output.get_capture(n_rx)
1107 def send_and_expect_only(self, intf, pkts, output, timeout=None):
1108 self.pg_send(intf, pkts)
1109 rx = output.get_capture(len(pkts))
1113 for i in self.pg_interfaces:
1114 if i not in outputs:
1115 i.get_capture(0, timeout=timeout)
1116 i.assert_nothing_captured()
1122 """ unittest calls runTest when TestCase is instantiated without a
1123 test case. Use case: Writing unittests against VppTestCase"""
1127 def get_testcase_doc_name(test):
1128 return getdoc(test.__class__).splitlines()[0]
1131 def get_test_description(descriptions, test):
1132 short_description = test.shortDescription()
1133 if descriptions and short_description:
1134 return short_description
1139 class TestCaseInfo(object):
1140 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1141 self.logger = logger
1142 self.tempdir = tempdir
1143 self.vpp_pid = vpp_pid
1144 self.vpp_bin_path = vpp_bin_path
1145 self.core_crash_test = None
1148 class VppTestResult(unittest.TestResult):
1150 @property result_string
1151 String variable to store the test case result string.
1153 List variable containing 2-tuples of TestCase instances and strings
1154 holding formatted tracebacks. Each tuple represents a test which
1155 raised an unexpected exception.
1157 List variable containing 2-tuples of TestCase instances and strings
1158 holding formatted tracebacks. Each tuple represents a test where
1159 a failure was explicitly signalled using the TestCase.assert*()
1163 failed_test_cases_info = set()
1164 core_crash_test_cases_info = set()
1165 current_test_case_info = None
1167 def __init__(self, stream=None, descriptions=None, verbosity=None,
1170 :param stream File descriptor to store where to report test results.
1171 Set to the standard error stream by default.
1172 :param descriptions Boolean variable to store information if to use
1173 test case descriptions.
1174 :param verbosity Integer variable to store required verbosity level.
1176 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1177 self.stream = stream
1178 self.descriptions = descriptions
1179 self.verbosity = verbosity
1180 self.result_string = None
1181 self.runner = runner
1183 def addSuccess(self, test):
1185 Record a test succeeded result
1190 if self.current_test_case_info:
1191 self.current_test_case_info.logger.debug(
1192 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1193 test._testMethodName,
1194 test._testMethodDoc))
1195 unittest.TestResult.addSuccess(self, test)
1196 self.result_string = colorize("OK", GREEN)
1198 self.send_result_through_pipe(test, PASS)
1200 def addSkip(self, test, reason):
1202 Record a test skipped.
1208 if self.current_test_case_info:
1209 self.current_test_case_info.logger.debug(
1210 "--- addSkip() %s.%s(%s) called, reason is %s" %
1211 (test.__class__.__name__, test._testMethodName,
1212 test._testMethodDoc, reason))
1213 unittest.TestResult.addSkip(self, test, reason)
1214 self.result_string = colorize("SKIP", YELLOW)
1216 self.send_result_through_pipe(test, SKIP)
1218 def symlink_failed(self):
1219 if self.current_test_case_info:
1221 failed_dir = os.getenv('FAILED_DIR')
1222 link_path = os.path.join(
1225 os.path.basename(self.current_test_case_info.tempdir))
1226 if self.current_test_case_info.logger:
1227 self.current_test_case_info.logger.debug(
1228 "creating a link to the failed test")
1229 self.current_test_case_info.logger.debug(
1230 "os.symlink(%s, %s)" %
1231 (self.current_test_case_info.tempdir, link_path))
1232 if os.path.exists(link_path):
1233 if self.current_test_case_info.logger:
1234 self.current_test_case_info.logger.debug(
1235 'symlink already exists')
1237 os.symlink(self.current_test_case_info.tempdir, link_path)
1239 except Exception as e:
1240 if self.current_test_case_info.logger:
1241 self.current_test_case_info.logger.error(e)
1243 def send_result_through_pipe(self, test, result):
1244 if hasattr(self, 'test_framework_result_pipe'):
1245 pipe = self.test_framework_result_pipe
1247 pipe.send((test.id(), result))
1249 def log_error(self, test, err, fn_name):
1250 if self.current_test_case_info:
1251 if isinstance(test, unittest.suite._ErrorHolder):
1252 test_name = test.description
1254 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1255 test._testMethodName,
1256 test._testMethodDoc)
1257 self.current_test_case_info.logger.debug(
1258 "--- %s() %s called, err is %s" %
1259 (fn_name, test_name, err))
1260 self.current_test_case_info.logger.debug(
1261 "formatted exception is:\n%s" %
1262 "".join(format_exception(*err)))
1264 def add_error(self, test, err, unittest_fn, error_type):
1265 if error_type == FAIL:
1266 self.log_error(test, err, 'addFailure')
1267 error_type_str = colorize("FAIL", RED)
1268 elif error_type == ERROR:
1269 self.log_error(test, err, 'addError')
1270 error_type_str = colorize("ERROR", RED)
1272 raise Exception('Error type %s cannot be used to record an '
1273 'error or a failure' % error_type)
1275 unittest_fn(self, test, err)
1276 if self.current_test_case_info:
1277 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1279 self.current_test_case_info.tempdir)
1280 self.symlink_failed()
1281 self.failed_test_cases_info.add(self.current_test_case_info)
1282 if is_core_present(self.current_test_case_info.tempdir):
1283 if not self.current_test_case_info.core_crash_test:
1284 if isinstance(test, unittest.suite._ErrorHolder):
1285 test_name = str(test)
1287 test_name = "'{!s}' ({!s})".format(
1288 get_testcase_doc_name(test), test.id())
1289 self.current_test_case_info.core_crash_test = test_name
1290 self.core_crash_test_cases_info.add(
1291 self.current_test_case_info)
1293 self.result_string = '%s [no temp dir]' % error_type_str
1295 self.send_result_through_pipe(test, error_type)
1297 def addFailure(self, test, err):
1299 Record a test failed result
1302 :param err: error message
1305 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1307 def addError(self, test, err):
1309 Record a test error result
1312 :param err: error message
1315 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1317 def getDescription(self, test):
1319 Get test description
1322 :returns: test description
1325 return get_test_description(self.descriptions, test)
1327 def startTest(self, test):
1335 def print_header(test):
1336 if not hasattr(test.__class__, '_header_printed'):
1337 print(double_line_delim)
1338 print(colorize(getdoc(test).splitlines()[0], GREEN))
1339 print(double_line_delim)
1340 test.__class__._header_printed = True
1344 unittest.TestResult.startTest(self, test)
1345 if self.verbosity > 0:
1346 self.stream.writeln(
1347 "Starting " + self.getDescription(test) + " ...")
1348 self.stream.writeln(single_line_delim)
1350 def stopTest(self, test):
1352 Called when the given test has been run
1357 unittest.TestResult.stopTest(self, test)
1358 if self.verbosity > 0:
1359 self.stream.writeln(single_line_delim)
1360 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1361 self.result_string))
1362 self.stream.writeln(single_line_delim)
1364 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1365 self.result_string))
1367 self.send_result_through_pipe(test, TEST_RUN)
1369 def printErrors(self):
1371 Print errors from running the test case
1373 if len(self.errors) > 0 or len(self.failures) > 0:
1374 self.stream.writeln()
1375 self.printErrorList('ERROR', self.errors)
1376 self.printErrorList('FAIL', self.failures)
1378 # ^^ that is the last output from unittest before summary
1379 if not self.runner.print_summary:
1380 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1381 self.stream = devnull
1382 self.runner.stream = devnull
1384 def printErrorList(self, flavour, errors):
1386 Print error list to the output stream together with error type
1387 and test case description.
1389 :param flavour: error type
1390 :param errors: iterable errors
1393 for test, err in errors:
1394 self.stream.writeln(double_line_delim)
1395 self.stream.writeln("%s: %s" %
1396 (flavour, self.getDescription(test)))
1397 self.stream.writeln(single_line_delim)
1398 self.stream.writeln("%s" % err)
1401 class VppTestRunner(unittest.TextTestRunner):
1403 A basic test runner implementation which prints results to standard error.
1407 def resultclass(self):
1408 """Class maintaining the results of the tests"""
1409 return VppTestResult
1411 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1412 result_pipe=None, failfast=False, buffer=False,
1413 resultclass=None, print_summary=True, **kwargs):
1414 # ignore stream setting here, use hard-coded stdout to be in sync
1415 # with prints from VppTestCase methods ...
1416 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1417 verbosity, failfast, buffer,
1418 resultclass, **kwargs)
1419 KeepAliveReporter.pipe = keep_alive_pipe
1421 self.orig_stream = self.stream
1422 self.resultclass.test_framework_result_pipe = result_pipe
1424 self.print_summary = print_summary
1426 def _makeResult(self):
1427 return self.resultclass(self.stream,
1432 def run(self, test):
1439 faulthandler.enable() # emit stack trace to stderr if killed by signal
1441 result = super(VppTestRunner, self).run(test)
1442 if not self.print_summary:
1443 self.stream = self.orig_stream
1444 result.stream = self.orig_stream
1448 class Worker(Thread):
1449 def __init__(self, args, logger, env={}):
1450 self.logger = logger
1453 self.env = copy.deepcopy(env)
1454 super(Worker, self).__init__()
1457 executable = self.args[0]
1458 self.logger.debug("Running executable w/args `%s'" % self.args)
1459 env = os.environ.copy()
1460 env.update(self.env)
1461 env["CK_LOG_FILE_NAME"] = "-"
1462 self.process = subprocess.Popen(
1463 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1464 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1465 out, err = self.process.communicate()
1466 self.logger.debug("Finished running `%s'" % executable)
1467 self.logger.info("Return code is `%s'" % self.process.returncode)
1468 self.logger.info(single_line_delim)
1469 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1470 self.logger.info(single_line_delim)
1471 self.logger.info(out)
1472 self.logger.info(single_line_delim)
1473 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1474 self.logger.info(single_line_delim)
1475 self.logger.info(err)
1476 self.logger.info(single_line_delim)
1477 self.result = self.process.returncode
1480 if __name__ == '__main__':