3 from __future__ import print_function
16 from collections import deque
17 from threading import Thread, Event
18 from inspect import getdoc, isclass
19 from traceback import format_exception
20 from logging import FileHandler, DEBUG, Formatter
23 from scapy.packet import Raw
24 from hook import StepHook, PollHook, VppDiedError
25 from vpp_pg_interface import VppPGInterface
26 from vpp_sub_interface import VppSubInterface
27 from vpp_lo_interface import VppLoInterface
28 from vpp_papi_provider import VppPapiProvider
29 from vpp_papi.vpp_stats import VPPStats
30 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
32 from vpp_object import VppObjectRegistry
33 from util import ppp, is_core_present
34 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
35 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
36 from scapy.layers.inet6 import ICMPv6EchoReply
38 if os.name == 'posix' and sys.version_info[0] < 3:
39 # using subprocess32 is recommended by python official documentation
40 # @ https://docs.python.org/2/library/subprocess.html
41 import subprocess32 as subprocess
45 # Python2/3 compatible
57 debug_framework = False
58 if os.getenv('TEST_DEBUG', "0") == "1":
59 debug_framework = True
63 Test framework module.
65 The module provides a set of tools for constructing and running tests and
66 representing the results.
70 class _PacketInfo(object):
71 """Private class to create packet info object.
73 Help process information about the next packet.
74 Set variables to default values.
76 #: Store the index of the packet.
78 #: Store the index of the source packet generator interface of the packet.
80 #: Store the index of the destination packet generator interface
83 #: Store expected ip version
85 #: Store expected upper protocol
87 #: Store the copy of the former packet.
90 def __eq__(self, other):
91 index = self.index == other.index
92 src = self.src == other.src
93 dst = self.dst == other.dst
94 data = self.data == other.data
95 return index and src and dst and data
98 def pump_output(testclass):
99 """ pump output from vpp stdout/stderr to proper queues """
102 while not testclass.pump_thread_stop_flag.is_set():
103 readable = select.select([testclass.vpp.stdout.fileno(),
104 testclass.vpp.stderr.fileno(),
105 testclass.pump_thread_wakeup_pipe[0]],
107 if testclass.vpp.stdout.fileno() in readable:
108 read = os.read(testclass.vpp.stdout.fileno(), 102400)
110 split = read.splitlines(True)
111 if len(stdout_fragment) > 0:
112 split[0] = "%s%s" % (stdout_fragment, split[0])
113 if len(split) > 0 and split[-1].endswith("\n"):
117 stdout_fragment = split[-1]
118 testclass.vpp_stdout_deque.extend(split[:limit])
119 if not testclass.cache_vpp_output:
120 for line in split[:limit]:
121 testclass.logger.debug(
122 "VPP STDOUT: %s" % line.rstrip("\n"))
123 if testclass.vpp.stderr.fileno() in readable:
124 read = os.read(testclass.vpp.stderr.fileno(), 102400)
126 split = read.splitlines(True)
127 if len(stderr_fragment) > 0:
128 split[0] = "%s%s" % (stderr_fragment, split[0])
129 if len(split) > 0 and split[-1].endswith(b"\n"):
133 stderr_fragment = split[-1]
134 testclass.vpp_stderr_deque.extend(split[:limit])
135 if not testclass.cache_vpp_output:
136 for line in split[:limit]:
137 testclass.logger.debug(
138 "VPP STDERR: %s" % line.rstrip("\n"))
139 # ignoring the dummy pipe here intentionally - the
140 # flag will take care of properly terminating the loop
143 def _is_skip_aarch64_set():
144 return os.getenv('SKIP_AARCH64', 'n').lower() in ('yes', 'y', '1')
146 is_skip_aarch64_set = _is_skip_aarch64_set()
149 def _is_platform_aarch64():
150 return platform.machine() == 'aarch64'
152 is_platform_aarch64 = _is_platform_aarch64()
155 def _running_extended_tests():
156 s = os.getenv("EXTENDED_TESTS", "n")
157 return True if s.lower() in ("y", "yes", "1") else False
159 running_extended_tests = _running_extended_tests()
162 def _running_on_centos():
163 os_id = os.getenv("OS_ID", "")
164 return True if "centos" in os_id.lower() else False
166 running_on_centos = _running_on_centos
169 class KeepAliveReporter(object):
171 Singleton object which reports test start to parent process
176 self.__dict__ = self._shared_state
184 def pipe(self, pipe):
185 if self._pipe is not None:
186 raise Exception("Internal error - pipe should only be set once.")
189 def send_keep_alive(self, test, desc=None):
191 Write current test tmpdir & desc to keep-alive pipe to signal liveness
193 if self.pipe is None:
194 # if not running forked..
198 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
202 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
205 class VppTestCase(unittest.TestCase):
206 """This subclass is a base class for VPP test cases that are implemented as
207 classes. It provides methods to create and run test case.
210 extra_vpp_punt_config = []
211 extra_vpp_plugin_config = []
214 def packet_infos(self):
215 """List of packet infos"""
216 return self._packet_infos
219 def get_packet_count_for_if_idx(cls, dst_if_index):
220 """Get the number of packet info for specified destination if index"""
221 if dst_if_index in cls._packet_count_for_dst_if_idx:
222 return cls._packet_count_for_dst_if_idx[dst_if_index]
228 """Return the instance of this testcase"""
229 return cls.test_instance
232 def set_debug_flags(cls, d):
233 cls.debug_core = False
234 cls.debug_gdb = False
235 cls.debug_gdbserver = False
240 cls.debug_core = True
243 elif dl == "gdbserver":
244 cls.debug_gdbserver = True
246 raise Exception("Unrecognized DEBUG option: '%s'" % d)
249 def get_least_used_cpu():
250 cpu_usage_list = [set(range(psutil.cpu_count()))]
251 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
252 if 'vpp_main' == p.info['name']]
253 for vpp_process in vpp_processes:
254 for cpu_usage_set in cpu_usage_list:
256 cpu_num = vpp_process.cpu_num()
257 if cpu_num in cpu_usage_set:
258 cpu_usage_set_index = cpu_usage_list.index(
260 if cpu_usage_set_index == len(cpu_usage_list) - 1:
261 cpu_usage_list.append({cpu_num})
263 cpu_usage_list[cpu_usage_set_index + 1].add(
265 cpu_usage_set.remove(cpu_num)
267 except psutil.NoSuchProcess:
270 for cpu_usage_set in cpu_usage_list:
271 if len(cpu_usage_set) > 0:
272 min_usage_set = cpu_usage_set
275 return random.choice(tuple(min_usage_set))
278 def setUpConstants(cls):
279 """ Set-up the test case class based on environment variables """
280 s = os.getenv("STEP", "n")
281 cls.step = True if s.lower() in ("y", "yes", "1") else False
282 d = os.getenv("DEBUG", None)
283 c = os.getenv("CACHE_OUTPUT", "1")
284 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
285 cls.set_debug_flags(d)
286 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
287 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
288 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
290 if cls.plugin_path is not None:
291 if cls.extern_plugin_path is not None:
292 plugin_path = "%s:%s" % (
293 cls.plugin_path, cls.extern_plugin_path)
295 plugin_path = cls.plugin_path
296 elif cls.extern_plugin_path is not None:
297 plugin_path = cls.extern_plugin_path
299 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
300 debug_cli = "cli-listen localhost:5002"
302 size = os.getenv("COREDUMP_SIZE")
304 coredump_size = "coredump-size %s" % size
305 if coredump_size is None:
306 coredump_size = "coredump-size unlimited"
308 cpu_core_number = cls.get_least_used_cpu()
310 cls.vpp_cmdline = [cls.vpp_bin, "unix",
311 "{", "nodaemon", debug_cli, "full-coredump",
312 coredump_size, "runtime-dir", cls.tempdir, "}",
313 "api-trace", "{", "on", "}", "api-segment", "{",
314 "prefix", cls.shm_prefix, "}", "cpu", "{",
315 "main-core", str(cpu_core_number), "}", "statseg",
316 "{", "socket-name", cls.stats_sock, "}", "plugins",
317 "{", "plugin", "dpdk_plugin.so", "{", "disable",
318 "}", "plugin", "unittest_plugin.so", "{", "enable",
319 "}"] + cls.extra_vpp_plugin_config + ["}", ]
320 if cls.extra_vpp_punt_config is not None:
321 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
322 if plugin_path is not None:
323 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
324 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
325 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
328 def wait_for_enter(cls):
329 if cls.debug_gdbserver:
330 print(double_line_delim)
331 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
333 print(double_line_delim)
334 print("Spawned VPP with PID: %d" % cls.vpp.pid)
336 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
338 print(single_line_delim)
339 print("You can debug the VPP using e.g.:")
340 if cls.debug_gdbserver:
341 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
342 print("Now is the time to attach a gdb by running the above "
343 "command, set up breakpoints etc. and then resume VPP from "
344 "within gdb by issuing the 'continue' command")
346 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
347 print("Now is the time to attach a gdb by running the above "
348 "command and set up breakpoints etc.")
349 print(single_line_delim)
350 input("Press ENTER to continue running the testcase...")
354 cmdline = cls.vpp_cmdline
356 if cls.debug_gdbserver:
357 gdbserver = '/usr/bin/gdbserver'
358 if not os.path.isfile(gdbserver) or \
359 not os.access(gdbserver, os.X_OK):
360 raise Exception("gdbserver binary '%s' does not exist or is "
361 "not executable" % gdbserver)
363 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
364 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
367 cls.vpp = subprocess.Popen(cmdline,
368 stdout=subprocess.PIPE,
369 stderr=subprocess.PIPE,
371 except subprocess.CalledProcessError as e:
372 cls.logger.critical("Subprocess returned with non-0 return code: ("
376 cls.logger.critical("Subprocess returned with OS error: "
377 "(%s) %s", e.errno, e.strerror)
379 except Exception as e:
380 cls.logger.exception("Subprocess returned unexpected from "
387 def wait_for_stats_socket(cls):
388 deadline = time.time() + 3
390 while time.time() < deadline or \
391 cls.debug_gdb or cls.debug_gdbserver:
392 if os.path.exists(cls.stats_sock):
397 cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
402 Perform class setup before running the testcase
403 Remove shared memory files, start vpp and connect the vpp-api
405 super(VppTestCase, cls).setUpClass()
406 gc.collect() # run garbage collection first
408 cls.logger = get_logger(cls.__name__)
409 if hasattr(cls, 'parallel_handler'):
410 cls.logger.addHandler(cls.parallel_handler)
411 cls.logger.propagate = False
412 cls.tempdir = tempfile.mkdtemp(
413 prefix='vpp-unittest-%s-' % cls.__name__)
414 cls.stats_sock = "%s/stats.sock" % cls.tempdir
415 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
416 cls.file_handler.setFormatter(
417 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
419 cls.file_handler.setLevel(DEBUG)
420 cls.logger.addHandler(cls.file_handler)
421 cls.shm_prefix = os.path.basename(cls.tempdir)
422 os.chdir(cls.tempdir)
423 cls.logger.info("Temporary dir is %s, shm prefix is %s",
424 cls.tempdir, cls.shm_prefix)
426 cls.reset_packet_infos()
428 cls._zombie_captures = []
431 cls.registry = VppObjectRegistry()
432 cls.vpp_startup_failed = False
433 cls.reporter = KeepAliveReporter()
434 # need to catch exceptions here because if we raise, then the cleanup
435 # doesn't get called and we might end with a zombie vpp
438 cls.reporter.send_keep_alive(cls, 'setUpClass')
439 VppTestResult.current_test_case_info = TestCaseInfo(
440 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
441 cls.vpp_stdout_deque = deque()
442 cls.vpp_stderr_deque = deque()
443 cls.pump_thread_stop_flag = Event()
444 cls.pump_thread_wakeup_pipe = os.pipe()
445 cls.pump_thread = Thread(target=pump_output, args=(cls,))
446 cls.pump_thread.daemon = True
447 cls.pump_thread.start()
448 if cls.debug_gdb or cls.debug_gdbserver:
452 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
458 cls.vapi.register_hook(hook)
459 cls.wait_for_stats_socket()
460 cls.statistics = VPPStats(socketname=cls.stats_sock)
464 cls.vpp_startup_failed = True
466 "VPP died shortly after startup, check the"
467 " output to standard error for possible cause")
473 cls.vapi.disconnect()
476 if cls.debug_gdbserver:
477 print(colorize("You're running VPP inside gdbserver but "
478 "VPP-API connection failed, did you forget "
479 "to 'continue' VPP from within gdb?", RED))
491 Disconnect vpp-api, kill vpp and cleanup shared memory files
493 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
495 if cls.vpp.returncode is None:
496 print(double_line_delim)
497 print("VPP or GDB server is still running")
498 print(single_line_delim)
499 input("When done debugging, press ENTER to kill the "
500 "process and finish running the testcase...")
502 # first signal that we want to stop the pump thread, then wake it up
503 if hasattr(cls, 'pump_thread_stop_flag'):
504 cls.pump_thread_stop_flag.set()
505 if hasattr(cls, 'pump_thread_wakeup_pipe'):
506 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
507 if hasattr(cls, 'pump_thread'):
508 cls.logger.debug("Waiting for pump thread to stop")
509 cls.pump_thread.join()
510 if hasattr(cls, 'vpp_stderr_reader_thread'):
511 cls.logger.debug("Waiting for stdderr pump to stop")
512 cls.vpp_stderr_reader_thread.join()
514 if hasattr(cls, 'vpp'):
515 if hasattr(cls, 'vapi'):
516 cls.logger.debug("Disconnecting class vapi client on %s",
518 cls.vapi.disconnect()
519 cls.logger.debug("Deleting class vapi attribute on %s",
523 if cls.vpp.returncode is None:
524 cls.logger.debug("Sending TERM to vpp")
526 cls.logger.debug("Waiting for vpp to die")
527 cls.vpp.communicate()
528 cls.logger.debug("Deleting class vpp attribute on %s",
532 if cls.vpp_startup_failed:
533 stdout_log = cls.logger.info
534 stderr_log = cls.logger.critical
536 stdout_log = cls.logger.info
537 stderr_log = cls.logger.info
539 if hasattr(cls, 'vpp_stdout_deque'):
540 stdout_log(single_line_delim)
541 stdout_log('VPP output to stdout while running %s:', cls.__name__)
542 stdout_log(single_line_delim)
543 vpp_output = "".join(cls.vpp_stdout_deque)
544 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
546 stdout_log('\n%s', vpp_output)
547 stdout_log(single_line_delim)
549 if hasattr(cls, 'vpp_stderr_deque'):
550 stderr_log(single_line_delim)
551 stderr_log('VPP output to stderr while running %s:', cls.__name__)
552 stderr_log(single_line_delim)
553 vpp_output = "".join(cls.vpp_stderr_deque)
554 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
556 stderr_log('\n%s', vpp_output)
557 stderr_log(single_line_delim)
560 def tearDownClass(cls):
561 """ Perform final cleanup after running all tests in this test-case """
562 cls.reporter.send_keep_alive(cls, 'tearDownClass')
564 cls.file_handler.close()
565 cls.reset_packet_infos()
567 debug_internal.on_tear_down_class(cls)
570 """ Show various debug prints after each test """
571 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
572 (self.__class__.__name__, self._testMethodName,
573 self._testMethodDoc))
574 if not self.vpp_dead:
575 self.logger.debug(self.vapi.cli("show trace max 1000"))
576 self.logger.info(self.vapi.ppcli("show interface"))
577 self.logger.info(self.vapi.ppcli("show hardware"))
578 self.logger.info(self.statistics.set_errors_str())
579 self.logger.info(self.vapi.ppcli("show run"))
580 self.logger.info(self.vapi.ppcli("show log"))
581 self.registry.remove_vpp_config(self.logger)
582 # Save/Dump VPP api trace log
583 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
584 tmp_api_trace = "/tmp/%s" % api_trace
585 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
586 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
587 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
589 os.rename(tmp_api_trace, vpp_api_trace_log)
590 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
593 self.registry.unregister_all(self.logger)
596 """ Clear trace before running each test"""
597 super(VppTestCase, self).setUp()
598 self.reporter.send_keep_alive(self)
599 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
600 (self.__class__.__name__, self._testMethodName,
601 self._testMethodDoc))
603 raise Exception("VPP is dead when setting up the test")
604 self.sleep(.1, "during setUp")
605 self.vpp_stdout_deque.append(
606 "--- test setUp() for %s.%s(%s) starts here ---\n" %
607 (self.__class__.__name__, self._testMethodName,
608 self._testMethodDoc))
609 self.vpp_stderr_deque.append(
610 "--- test setUp() for %s.%s(%s) starts here ---\n" %
611 (self.__class__.__name__, self._testMethodName,
612 self._testMethodDoc))
613 self.vapi.cli("clear trace")
614 # store the test instance inside the test class - so that objects
615 # holding the class can access instance methods (like assertEqual)
616 type(self).test_instance = self
619 def pg_enable_capture(cls, interfaces=None):
621 Enable capture on packet-generator interfaces
623 :param interfaces: iterable interface indexes (if None,
624 use self.pg_interfaces)
627 if interfaces is None:
628 interfaces = cls.pg_interfaces
633 def register_capture(cls, cap_name):
634 """ Register a capture in the testclass """
635 # add to the list of captures with current timestamp
636 cls._captures.append((time.time(), cap_name))
637 # filter out from zombies
638 cls._zombie_captures = [(stamp, name)
639 for (stamp, name) in cls._zombie_captures
644 """ Remove any zombie captures and enable the packet generator """
645 # how long before capture is allowed to be deleted - otherwise vpp
646 # crashes - 100ms seems enough (this shouldn't be needed at all)
649 for stamp, cap_name in cls._zombie_captures:
650 wait = stamp + capture_ttl - now
652 cls.sleep(wait, "before deleting capture %s" % cap_name)
654 cls.logger.debug("Removing zombie capture %s" % cap_name)
655 cls.vapi.cli('packet-generator delete %s' % cap_name)
657 cls.vapi.cli("trace add pg-input 1000")
658 cls.vapi.cli('packet-generator enable')
659 cls._zombie_captures = cls._captures
663 def create_pg_interfaces(cls, interfaces):
665 Create packet-generator interfaces.
667 :param interfaces: iterable indexes of the interfaces.
668 :returns: List of created interfaces.
673 intf = VppPGInterface(cls, i)
674 setattr(cls, intf.name, intf)
676 cls.pg_interfaces = result
680 def create_loopback_interfaces(cls, count):
682 Create loopback interfaces.
684 :param count: number of interfaces created.
685 :returns: List of created interfaces.
687 result = [VppLoInterface(cls) for i in range(count)]
689 setattr(cls, intf.name, intf)
690 cls.lo_interfaces = result
694 def extend_packet(packet, size, padding=' '):
696 Extend packet to given size by padding with spaces or custom padding
697 NOTE: Currently works only when Raw layer is present.
699 :param packet: packet
700 :param size: target size
701 :param padding: padding used to extend the payload
704 packet_len = len(packet) + 4
705 extend = size - packet_len
707 num = (extend / len(padding)) + 1
708 packet[Raw].load += (padding * num)[:extend]
711 def reset_packet_infos(cls):
712 """ Reset the list of packet info objects and packet counts to zero """
713 cls._packet_infos = {}
714 cls._packet_count_for_dst_if_idx = {}
717 def create_packet_info(cls, src_if, dst_if):
719 Create packet info object containing the source and destination indexes
720 and add it to the testcase's packet info list
722 :param VppInterface src_if: source interface
723 :param VppInterface dst_if: destination interface
725 :returns: _PacketInfo object
729 info.index = len(cls._packet_infos)
730 info.src = src_if.sw_if_index
731 info.dst = dst_if.sw_if_index
732 if isinstance(dst_if, VppSubInterface):
733 dst_idx = dst_if.parent.sw_if_index
735 dst_idx = dst_if.sw_if_index
736 if dst_idx in cls._packet_count_for_dst_if_idx:
737 cls._packet_count_for_dst_if_idx[dst_idx] += 1
739 cls._packet_count_for_dst_if_idx[dst_idx] = 1
740 cls._packet_infos[info.index] = info
744 def info_to_payload(info):
746 Convert _PacketInfo object to packet payload
748 :param info: _PacketInfo object
750 :returns: string containing serialized data from packet info
752 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
756 def payload_to_info(payload, payload_field='load'):
758 Convert packet payload to _PacketInfo object
760 :param payload: packet payload
761 :type: <class 'scapy.packet.Raw'>
762 :param: payload_field: packet fieldname of payload "load" for
763 <class 'scapy.packet.Raw'>
764 :returns: _PacketInfo object containing de-serialized data from payload
767 numbers = getattr(payload, payload_field).split()
769 info.index = int(numbers[0])
770 info.src = int(numbers[1])
771 info.dst = int(numbers[2])
772 info.ip = int(numbers[3])
773 info.proto = int(numbers[4])
776 def get_next_packet_info(self, info):
778 Iterate over the packet info list stored in the testcase
779 Start iteration with first element if info is None
780 Continue based on index in info if info is specified
782 :param info: info or None
783 :returns: next info in list or None if no more infos
788 next_index = info.index + 1
789 if next_index == len(self._packet_infos):
792 return self._packet_infos[next_index]
794 def get_next_packet_info_for_interface(self, src_index, info):
796 Search the packet info list for the next packet info with same source
799 :param src_index: source interface index to search for
800 :param info: packet info - where to start the search
801 :returns: packet info or None
805 info = self.get_next_packet_info(info)
808 if info.src == src_index:
811 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
813 Search the packet info list for the next packet info with same source
814 and destination interface indexes
816 :param src_index: source interface index to search for
817 :param dst_index: destination interface index to search for
818 :param info: packet info - where to start the search
819 :returns: packet info or None
823 info = self.get_next_packet_info_for_interface(src_index, info)
826 if info.dst == dst_index:
829 def assert_equal(self, real_value, expected_value, name_or_class=None):
830 if name_or_class is None:
831 self.assertEqual(real_value, expected_value)
834 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
835 msg = msg % (getdoc(name_or_class).strip(),
836 real_value, str(name_or_class(real_value)),
837 expected_value, str(name_or_class(expected_value)))
839 msg = "Invalid %s: %s does not match expected value %s" % (
840 name_or_class, real_value, expected_value)
842 self.assertEqual(real_value, expected_value, msg)
844 def assert_in_range(self,
852 msg = "Invalid %s: %s out of range <%s,%s>" % (
853 name, real_value, expected_min, expected_max)
854 self.assertTrue(expected_min <= real_value <= expected_max, msg)
856 def assert_packet_checksums_valid(self, packet,
857 ignore_zero_udp_checksums=True):
858 received = packet.__class__(scapy.compat.raw(packet))
860 ppp("Verifying packet checksums for packet:", received))
861 udp_layers = ['UDP', 'UDPerror']
862 checksum_fields = ['cksum', 'chksum']
865 temp = received.__class__(scapy.compat.raw(received))
867 layer = temp.getlayer(counter)
869 for cf in checksum_fields:
870 if hasattr(layer, cf):
871 if ignore_zero_udp_checksums and \
872 0 == getattr(layer, cf) and \
873 layer.name in udp_layers:
876 checksums.append((counter, cf))
879 counter = counter + 1
880 if 0 == len(checksums):
882 temp = temp.__class__(scapy.compat.raw(temp))
883 for layer, cf in checksums:
884 calc_sum = getattr(temp[layer], cf)
886 getattr(received[layer], cf), calc_sum,
887 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
889 "Checksum field `%s` on `%s` layer has correct value `%s`" %
890 (cf, temp[layer].name, calc_sum))
892 def assert_checksum_valid(self, received_packet, layer,
894 ignore_zero_checksum=False):
895 """ Check checksum of received packet on given layer """
896 received_packet_checksum = getattr(received_packet[layer], field_name)
897 if ignore_zero_checksum and 0 == received_packet_checksum:
899 recalculated = received_packet.__class__(
900 scapy.compat.raw(received_packet))
901 delattr(recalculated[layer], field_name)
902 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
903 self.assert_equal(received_packet_checksum,
904 getattr(recalculated[layer], field_name),
905 "packet checksum on layer: %s" % layer)
907 def assert_ip_checksum_valid(self, received_packet,
908 ignore_zero_checksum=False):
909 self.assert_checksum_valid(received_packet, 'IP',
910 ignore_zero_checksum=ignore_zero_checksum)
912 def assert_tcp_checksum_valid(self, received_packet,
913 ignore_zero_checksum=False):
914 self.assert_checksum_valid(received_packet, 'TCP',
915 ignore_zero_checksum=ignore_zero_checksum)
917 def assert_udp_checksum_valid(self, received_packet,
918 ignore_zero_checksum=True):
919 self.assert_checksum_valid(received_packet, 'UDP',
920 ignore_zero_checksum=ignore_zero_checksum)
922 def assert_embedded_icmp_checksum_valid(self, received_packet):
923 if received_packet.haslayer(IPerror):
924 self.assert_checksum_valid(received_packet, 'IPerror')
925 if received_packet.haslayer(TCPerror):
926 self.assert_checksum_valid(received_packet, 'TCPerror')
927 if received_packet.haslayer(UDPerror):
928 self.assert_checksum_valid(received_packet, 'UDPerror',
929 ignore_zero_checksum=True)
930 if received_packet.haslayer(ICMPerror):
931 self.assert_checksum_valid(received_packet, 'ICMPerror')
933 def assert_icmp_checksum_valid(self, received_packet):
934 self.assert_checksum_valid(received_packet, 'ICMP')
935 self.assert_embedded_icmp_checksum_valid(received_packet)
937 def assert_icmpv6_checksum_valid(self, pkt):
938 if pkt.haslayer(ICMPv6DestUnreach):
939 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
940 self.assert_embedded_icmp_checksum_valid(pkt)
941 if pkt.haslayer(ICMPv6EchoRequest):
942 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
943 if pkt.haslayer(ICMPv6EchoReply):
944 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
946 def assert_packet_counter_equal(self, counter, expected_value):
947 if counter.startswith("/"):
948 counter_value = self.statistics.get_counter(counter)
949 self.assert_equal(counter_value, expected_value,
950 "packet counter `%s'" % counter)
952 counters = self.vapi.cli("sh errors").split('\n')
954 for i in range(1, len(counters) - 1):
955 results = counters[i].split()
956 if results[1] == counter:
957 counter_value = int(results[0])
961 def sleep(cls, timeout, remark=None):
963 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
964 # * by Guido, only the main thread can be interrupted.
966 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
969 if hasattr(os, 'sched_yield'):
975 if hasattr(cls, 'logger'):
976 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
980 if hasattr(cls, 'logger') and after - before > 2 * timeout:
981 cls.logger.error("unexpected self.sleep() result - "
982 "slept for %es instead of ~%es!",
983 after - before, timeout)
984 if hasattr(cls, 'logger'):
986 "Finished sleep (%s) - slept %es (wanted %es)",
987 remark, after - before, timeout)
989 def pg_send(self, intf, pkts):
990 self.vapi.cli("clear trace")
991 intf.add_stream(pkts)
992 self.pg_enable_capture(self.pg_interfaces)
995 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
996 self.pg_send(intf, pkts)
999 for i in self.pg_interfaces:
1000 i.get_capture(0, timeout=timeout)
1001 i.assert_nothing_captured(remark=remark)
1004 def send_and_expect(self, intf, pkts, output):
1005 self.pg_send(intf, pkts)
1006 rx = output.get_capture(len(pkts))
1009 def send_and_expect_only(self, intf, pkts, output, timeout=None):
1010 self.pg_send(intf, pkts)
1011 rx = output.get_capture(len(pkts))
1015 for i in self.pg_interfaces:
1016 if i not in outputs:
1017 i.get_capture(0, timeout=timeout)
1018 i.assert_nothing_captured()
1024 """ unittest calls runTest when TestCase is instantiated without a
1025 test case. Use case: Writing unittests against VppTestCase"""
1029 def get_testcase_doc_name(test):
1030 return getdoc(test.__class__).splitlines()[0]
1033 def get_test_description(descriptions, test):
1034 short_description = test.shortDescription()
1035 if descriptions and short_description:
1036 return short_description
1041 class TestCaseInfo(object):
1042 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1043 self.logger = logger
1044 self.tempdir = tempdir
1045 self.vpp_pid = vpp_pid
1046 self.vpp_bin_path = vpp_bin_path
1047 self.core_crash_test = None
1050 class VppTestResult(unittest.TestResult):
1052 @property result_string
1053 String variable to store the test case result string.
1055 List variable containing 2-tuples of TestCase instances and strings
1056 holding formatted tracebacks. Each tuple represents a test which
1057 raised an unexpected exception.
1059 List variable containing 2-tuples of TestCase instances and strings
1060 holding formatted tracebacks. Each tuple represents a test where
1061 a failure was explicitly signalled using the TestCase.assert*()
1065 failed_test_cases_info = set()
1066 core_crash_test_cases_info = set()
1067 current_test_case_info = None
1069 def __init__(self, stream=None, descriptions=None, verbosity=None,
1072 :param stream File descriptor to store where to report test results.
1073 Set to the standard error stream by default.
1074 :param descriptions Boolean variable to store information if to use
1075 test case descriptions.
1076 :param verbosity Integer variable to store required verbosity level.
1078 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1079 self.stream = stream
1080 self.descriptions = descriptions
1081 self.verbosity = verbosity
1082 self.result_string = None
1083 self.runner = runner
1085 def addSuccess(self, test):
1087 Record a test succeeded result
1092 if self.current_test_case_info:
1093 self.current_test_case_info.logger.debug(
1094 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1095 test._testMethodName,
1096 test._testMethodDoc))
1097 unittest.TestResult.addSuccess(self, test)
1098 self.result_string = colorize("OK", GREEN)
1100 self.send_result_through_pipe(test, PASS)
1102 def addSkip(self, test, reason):
1104 Record a test skipped.
1110 if self.current_test_case_info:
1111 self.current_test_case_info.logger.debug(
1112 "--- addSkip() %s.%s(%s) called, reason is %s" %
1113 (test.__class__.__name__, test._testMethodName,
1114 test._testMethodDoc, reason))
1115 unittest.TestResult.addSkip(self, test, reason)
1116 self.result_string = colorize("SKIP", YELLOW)
1118 self.send_result_through_pipe(test, SKIP)
1120 def symlink_failed(self):
1121 if self.current_test_case_info:
1123 failed_dir = os.getenv('FAILED_DIR')
1124 link_path = os.path.join(
1127 os.path.basename(self.current_test_case_info.tempdir))
1128 if self.current_test_case_info.logger:
1129 self.current_test_case_info.logger.debug(
1130 "creating a link to the failed test")
1131 self.current_test_case_info.logger.debug(
1132 "os.symlink(%s, %s)" %
1133 (self.current_test_case_info.tempdir, link_path))
1134 if os.path.exists(link_path):
1135 if self.current_test_case_info.logger:
1136 self.current_test_case_info.logger.debug(
1137 'symlink already exists')
1139 os.symlink(self.current_test_case_info.tempdir, link_path)
1141 except Exception as e:
1142 if self.current_test_case_info.logger:
1143 self.current_test_case_info.logger.error(e)
1145 def send_result_through_pipe(self, test, result):
1146 if hasattr(self, 'test_framework_result_pipe'):
1147 pipe = self.test_framework_result_pipe
1149 pipe.send((test.id(), result))
1151 def log_error(self, test, err, fn_name):
1152 if self.current_test_case_info:
1153 if isinstance(test, unittest.suite._ErrorHolder):
1154 test_name = test.description
1156 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1157 test._testMethodName,
1158 test._testMethodDoc)
1159 self.current_test_case_info.logger.debug(
1160 "--- %s() %s called, err is %s" %
1161 (fn_name, test_name, err))
1162 self.current_test_case_info.logger.debug(
1163 "formatted exception is:\n%s" %
1164 "".join(format_exception(*err)))
1166 def add_error(self, test, err, unittest_fn, error_type):
1167 if error_type == FAIL:
1168 self.log_error(test, err, 'addFailure')
1169 error_type_str = colorize("FAIL", RED)
1170 elif error_type == ERROR:
1171 self.log_error(test, err, 'addError')
1172 error_type_str = colorize("ERROR", RED)
1174 raise Exception('Error type %s cannot be used to record an '
1175 'error or a failure' % error_type)
1177 unittest_fn(self, test, err)
1178 if self.current_test_case_info:
1179 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1181 self.current_test_case_info.tempdir)
1182 self.symlink_failed()
1183 self.failed_test_cases_info.add(self.current_test_case_info)
1184 if is_core_present(self.current_test_case_info.tempdir):
1185 if not self.current_test_case_info.core_crash_test:
1186 if isinstance(test, unittest.suite._ErrorHolder):
1187 test_name = str(test)
1189 test_name = "'{!s}' ({!s})".format(
1190 get_testcase_doc_name(test), test.id())
1191 self.current_test_case_info.core_crash_test = test_name
1192 self.core_crash_test_cases_info.add(
1193 self.current_test_case_info)
1195 self.result_string = '%s [no temp dir]' % error_type_str
1197 self.send_result_through_pipe(test, error_type)
1199 def addFailure(self, test, err):
1201 Record a test failed result
1204 :param err: error message
1207 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1209 def addError(self, test, err):
1211 Record a test error result
1214 :param err: error message
1217 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1219 def getDescription(self, test):
1221 Get test description
1224 :returns: test description
1227 return get_test_description(self.descriptions, test)
1229 def startTest(self, test):
1237 def print_header(test):
1238 if not hasattr(test.__class__, '_header_printed'):
1239 print(double_line_delim)
1240 print(colorize(getdoc(test).splitlines()[0], GREEN))
1241 print(double_line_delim)
1242 test.__class__._header_printed = True
1246 unittest.TestResult.startTest(self, test)
1247 if self.verbosity > 0:
1248 self.stream.writeln(
1249 "Starting " + self.getDescription(test) + " ...")
1250 self.stream.writeln(single_line_delim)
1252 def stopTest(self, test):
1254 Called when the given test has been run
1259 unittest.TestResult.stopTest(self, test)
1260 if self.verbosity > 0:
1261 self.stream.writeln(single_line_delim)
1262 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1263 self.result_string))
1264 self.stream.writeln(single_line_delim)
1266 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1267 self.result_string))
1269 self.send_result_through_pipe(test, TEST_RUN)
1271 def printErrors(self):
1273 Print errors from running the test case
1275 if len(self.errors) > 0 or len(self.failures) > 0:
1276 self.stream.writeln()
1277 self.printErrorList('ERROR', self.errors)
1278 self.printErrorList('FAIL', self.failures)
1280 # ^^ that is the last output from unittest before summary
1281 if not self.runner.print_summary:
1282 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1283 self.stream = devnull
1284 self.runner.stream = devnull
1286 def printErrorList(self, flavour, errors):
1288 Print error list to the output stream together with error type
1289 and test case description.
1291 :param flavour: error type
1292 :param errors: iterable errors
1295 for test, err in errors:
1296 self.stream.writeln(double_line_delim)
1297 self.stream.writeln("%s: %s" %
1298 (flavour, self.getDescription(test)))
1299 self.stream.writeln(single_line_delim)
1300 self.stream.writeln("%s" % err)
1303 class VppTestRunner(unittest.TextTestRunner):
1305 A basic test runner implementation which prints results to standard error.
1309 def resultclass(self):
1310 """Class maintaining the results of the tests"""
1311 return VppTestResult
1313 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1314 result_pipe=None, failfast=False, buffer=False,
1315 resultclass=None, print_summary=True, **kwargs):
1316 # ignore stream setting here, use hard-coded stdout to be in sync
1317 # with prints from VppTestCase methods ...
1318 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1319 verbosity, failfast, buffer,
1320 resultclass, **kwargs)
1321 KeepAliveReporter.pipe = keep_alive_pipe
1323 self.orig_stream = self.stream
1324 self.resultclass.test_framework_result_pipe = result_pipe
1326 self.print_summary = print_summary
1328 def _makeResult(self):
1329 return self.resultclass(self.stream,
1334 def run(self, test):
1341 faulthandler.enable() # emit stack trace to stderr if killed by signal
1343 result = super(VppTestRunner, self).run(test)
1344 if not self.print_summary:
1345 self.stream = self.orig_stream
1346 result.stream = self.orig_stream
1350 class Worker(Thread):
1351 def __init__(self, args, logger, env={}):
1352 self.logger = logger
1355 self.env = copy.deepcopy(env)
1356 super(Worker, self).__init__()
1359 executable = self.args[0]
1360 self.logger.debug("Running executable w/args `%s'" % self.args)
1361 env = os.environ.copy()
1362 env.update(self.env)
1363 env["CK_LOG_FILE_NAME"] = "-"
1364 self.process = subprocess.Popen(
1365 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1366 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1367 out, err = self.process.communicate()
1368 self.logger.debug("Finished running `%s'" % executable)
1369 self.logger.info("Return code is `%s'" % self.process.returncode)
1370 self.logger.info(single_line_delim)
1371 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1372 self.logger.info(single_line_delim)
1373 self.logger.info(out)
1374 self.logger.info(single_line_delim)
1375 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1376 self.logger.info(single_line_delim)
1377 self.logger.info(err)
1378 self.logger.info(single_line_delim)
1379 self.result = self.process.returncode
1381 if __name__ == '__main__':