3 from __future__ import print_function
16 from collections import deque
17 from threading import Thread, Event
18 from inspect import getdoc, isclass
19 from traceback import format_exception
20 from logging import FileHandler, DEBUG, Formatter
23 from scapy.packet import Raw
24 from hook import StepHook, PollHook, VppDiedError
25 from vpp_pg_interface import VppPGInterface
26 from vpp_sub_interface import VppSubInterface
27 from vpp_lo_interface import VppLoInterface
28 from vpp_papi_provider import VppPapiProvider
29 from vpp_papi.vpp_stats import VPPStats
30 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
32 from vpp_object import VppObjectRegistry
33 from util import ppp, is_core_present
34 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
35 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
36 from scapy.layers.inet6 import ICMPv6EchoReply
38 if os.name == 'posix' and sys.version_info[0] < 3:
39 # using subprocess32 is recommended by python official documentation
40 # @ https://docs.python.org/2/library/subprocess.html
41 import subprocess32 as subprocess
45 # Python2/3 compatible
57 debug_framework = False
58 if os.getenv('TEST_DEBUG', "0") == "1":
59 debug_framework = True
63 Test framework module.
65 The module provides a set of tools for constructing and running tests and
66 representing the results.
70 class _PacketInfo(object):
71 """Private class to create packet info object.
73 Help process information about the next packet.
74 Set variables to default values.
76 #: Store the index of the packet.
78 #: Store the index of the source packet generator interface of the packet.
80 #: Store the index of the destination packet generator interface
83 #: Store expected ip version
85 #: Store expected upper protocol
87 #: Store the copy of the former packet.
90 def __eq__(self, other):
91 index = self.index == other.index
92 src = self.src == other.src
93 dst = self.dst == other.dst
94 data = self.data == other.data
95 return index and src and dst and data
98 def pump_output(testclass):
99 """ pump output from vpp stdout/stderr to proper queues """
102 while not testclass.pump_thread_stop_flag.is_set():
103 readable = select.select([testclass.vpp.stdout.fileno(),
104 testclass.vpp.stderr.fileno(),
105 testclass.pump_thread_wakeup_pipe[0]],
107 if testclass.vpp.stdout.fileno() in readable:
108 read = os.read(testclass.vpp.stdout.fileno(), 102400)
110 split = read.splitlines(True)
111 if len(stdout_fragment) > 0:
112 split[0] = "%s%s" % (stdout_fragment, split[0])
113 if len(split) > 0 and split[-1].endswith("\n"):
117 stdout_fragment = split[-1]
118 testclass.vpp_stdout_deque.extend(split[:limit])
119 if not testclass.cache_vpp_output:
120 for line in split[:limit]:
121 testclass.logger.debug(
122 "VPP STDOUT: %s" % line.rstrip("\n"))
123 if testclass.vpp.stderr.fileno() in readable:
124 read = os.read(testclass.vpp.stderr.fileno(), 102400)
126 split = read.splitlines(True)
127 if len(stderr_fragment) > 0:
128 split[0] = "%s%s" % (stderr_fragment, split[0])
129 if len(split) > 0 and split[-1].endswith(b"\n"):
133 stderr_fragment = split[-1]
134 testclass.vpp_stderr_deque.extend(split[:limit])
135 if not testclass.cache_vpp_output:
136 for line in split[:limit]:
137 testclass.logger.debug(
138 "VPP STDERR: %s" % line.rstrip("\n"))
139 # ignoring the dummy pipe here intentionally - the
140 # flag will take care of properly terminating the loop
143 def _is_skip_aarch64_set():
144 return os.getenv('SKIP_AARCH64', 'n').lower() in ('yes', 'y', '1')
146 is_skip_aarch64_set = _is_skip_aarch64_set()
149 def _is_platform_aarch64():
150 return platform.machine() == 'aarch64'
152 is_platform_aarch64 = _is_platform_aarch64()
155 def _running_extended_tests():
156 s = os.getenv("EXTENDED_TESTS", "n")
157 return True if s.lower() in ("y", "yes", "1") else False
159 running_extended_tests = _running_extended_tests()
162 def _running_on_centos():
163 os_id = os.getenv("OS_ID", "")
164 return True if "centos" in os_id.lower() else False
166 running_on_centos = _running_on_centos
169 class KeepAliveReporter(object):
171 Singleton object which reports test start to parent process
176 self.__dict__ = self._shared_state
184 def pipe(self, pipe):
185 if self._pipe is not None:
186 raise Exception("Internal error - pipe should only be set once.")
189 def send_keep_alive(self, test, desc=None):
191 Write current test tmpdir & desc to keep-alive pipe to signal liveness
193 if self.pipe is None:
194 # if not running forked..
198 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
202 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
205 class VppTestCase(unittest.TestCase):
206 """This subclass is a base class for VPP test cases that are implemented as
207 classes. It provides methods to create and run test case.
210 extra_vpp_punt_config = []
211 extra_vpp_plugin_config = []
214 def packet_infos(self):
215 """List of packet infos"""
216 return self._packet_infos
219 def get_packet_count_for_if_idx(cls, dst_if_index):
220 """Get the number of packet info for specified destination if index"""
221 if dst_if_index in cls._packet_count_for_dst_if_idx:
222 return cls._packet_count_for_dst_if_idx[dst_if_index]
228 """Return the instance of this testcase"""
229 return cls.test_instance
232 def set_debug_flags(cls, d):
233 cls.debug_core = False
234 cls.debug_gdb = False
235 cls.debug_gdbserver = False
240 cls.debug_core = True
243 elif dl == "gdbserver":
244 cls.debug_gdbserver = True
246 raise Exception("Unrecognized DEBUG option: '%s'" % d)
249 def get_least_used_cpu():
250 cpu_usage_list = [set(range(psutil.cpu_count()))]
251 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
252 if 'vpp_main' == p.info['name']]
253 for vpp_process in vpp_processes:
254 for cpu_usage_set in cpu_usage_list:
256 cpu_num = vpp_process.cpu_num()
257 if cpu_num in cpu_usage_set:
258 cpu_usage_set_index = cpu_usage_list.index(
260 if cpu_usage_set_index == len(cpu_usage_list) - 1:
261 cpu_usage_list.append({cpu_num})
263 cpu_usage_list[cpu_usage_set_index + 1].add(
265 cpu_usage_set.remove(cpu_num)
267 except psutil.NoSuchProcess:
270 for cpu_usage_set in cpu_usage_list:
271 if len(cpu_usage_set) > 0:
272 min_usage_set = cpu_usage_set
275 return random.choice(tuple(min_usage_set))
278 def setUpConstants(cls):
279 """ Set-up the test case class based on environment variables """
280 s = os.getenv("STEP", "n")
281 cls.step = True if s.lower() in ("y", "yes", "1") else False
282 d = os.getenv("DEBUG", None)
283 c = os.getenv("CACHE_OUTPUT", "1")
284 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
285 cls.set_debug_flags(d)
286 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
287 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
288 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
290 if cls.plugin_path is not None:
291 if cls.extern_plugin_path is not None:
292 plugin_path = "%s:%s" % (
293 cls.plugin_path, cls.extern_plugin_path)
295 plugin_path = cls.plugin_path
296 elif cls.extern_plugin_path is not None:
297 plugin_path = cls.extern_plugin_path
299 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
300 debug_cli = "cli-listen localhost:5002"
302 size = os.getenv("COREDUMP_SIZE")
304 coredump_size = "coredump-size %s" % size
305 if coredump_size is None:
306 coredump_size = "coredump-size unlimited"
308 cpu_core_number = cls.get_least_used_cpu()
310 cls.vpp_cmdline = [cls.vpp_bin, "unix",
311 "{", "nodaemon", debug_cli, "full-coredump",
312 coredump_size, "runtime-dir", cls.tempdir, "}",
313 "api-trace", "{", "on", "}", "api-segment", "{",
314 "prefix", cls.shm_prefix, "}", "cpu", "{",
315 "main-core", str(cpu_core_number), "}", "statseg",
316 "{", "socket-name", cls.stats_sock, "}", "plugins",
317 "{", "plugin", "dpdk_plugin.so", "{", "disable",
318 "}", "plugin", "unittest_plugin.so", "{", "enable",
319 "}"] + cls.extra_vpp_plugin_config + ["}", ]
320 if cls.extra_vpp_punt_config is not None:
321 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
322 if plugin_path is not None:
323 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
324 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
325 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
328 def wait_for_enter(cls):
329 if cls.debug_gdbserver:
330 print(double_line_delim)
331 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
333 print(double_line_delim)
334 print("Spawned VPP with PID: %d" % cls.vpp.pid)
336 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
338 print(single_line_delim)
339 print("You can debug the VPP using e.g.:")
340 if cls.debug_gdbserver:
341 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
342 print("Now is the time to attach a gdb by running the above "
343 "command, set up breakpoints etc. and then resume VPP from "
344 "within gdb by issuing the 'continue' command")
346 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
347 print("Now is the time to attach a gdb by running the above "
348 "command and set up breakpoints etc.")
349 print(single_line_delim)
350 input("Press ENTER to continue running the testcase...")
354 cmdline = cls.vpp_cmdline
356 if cls.debug_gdbserver:
357 gdbserver = '/usr/bin/gdbserver'
358 if not os.path.isfile(gdbserver) or \
359 not os.access(gdbserver, os.X_OK):
360 raise Exception("gdbserver binary '%s' does not exist or is "
361 "not executable" % gdbserver)
363 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
364 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
367 cls.vpp = subprocess.Popen(cmdline,
368 stdout=subprocess.PIPE,
369 stderr=subprocess.PIPE,
371 except subprocess.CalledProcessError as e:
372 cls.logger.critical("Subprocess returned with non-0 return code: ("
376 cls.logger.critical("Subprocess returned with OS error: "
377 "(%s) %s", e.errno, e.strerror)
379 except Exception as e:
380 cls.logger.exception("Subprocess returned unexpected from "
387 def wait_for_stats_socket(cls):
388 deadline = time.time() + 3
390 while time.time() < deadline or \
391 cls.debug_gdb or cls.debug_gdbserver:
392 if os.path.exists(cls.stats_sock):
397 cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
402 Perform class setup before running the testcase
403 Remove shared memory files, start vpp and connect the vpp-api
405 super(VppTestCase, cls).setUpClass()
406 gc.collect() # run garbage collection first
408 cls.logger = get_logger(cls.__name__)
409 if hasattr(cls, 'parallel_handler'):
410 cls.logger.addHandler(cls.parallel_handler)
411 cls.logger.propagate = False
412 cls.tempdir = tempfile.mkdtemp(
413 prefix='vpp-unittest-%s-' % cls.__name__)
414 cls.stats_sock = "%s/stats.sock" % cls.tempdir
415 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
416 cls.file_handler.setFormatter(
417 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
419 cls.file_handler.setLevel(DEBUG)
420 cls.logger.addHandler(cls.file_handler)
421 cls.shm_prefix = os.path.basename(cls.tempdir)
422 os.chdir(cls.tempdir)
423 cls.logger.info("Temporary dir is %s, shm prefix is %s",
424 cls.tempdir, cls.shm_prefix)
426 cls.reset_packet_infos()
428 cls._zombie_captures = []
431 cls.registry = VppObjectRegistry()
432 cls.vpp_startup_failed = False
433 cls.reporter = KeepAliveReporter()
434 # need to catch exceptions here because if we raise, then the cleanup
435 # doesn't get called and we might end with a zombie vpp
438 cls.reporter.send_keep_alive(cls, 'setUpClass')
439 VppTestResult.current_test_case_info = TestCaseInfo(
440 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
441 cls.vpp_stdout_deque = deque()
442 cls.vpp_stderr_deque = deque()
443 cls.pump_thread_stop_flag = Event()
444 cls.pump_thread_wakeup_pipe = os.pipe()
445 cls.pump_thread = Thread(target=pump_output, args=(cls,))
446 cls.pump_thread.daemon = True
447 cls.pump_thread.start()
448 if cls.debug_gdb or cls.debug_gdbserver:
452 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
458 cls.vapi.register_hook(hook)
459 cls.wait_for_stats_socket()
460 cls.statistics = VPPStats(socketname=cls.stats_sock)
464 cls.vpp_startup_failed = True
466 "VPP died shortly after startup, check the"
467 " output to standard error for possible cause")
473 cls.vapi.disconnect()
476 if cls.debug_gdbserver:
477 print(colorize("You're running VPP inside gdbserver but "
478 "VPP-API connection failed, did you forget "
479 "to 'continue' VPP from within gdb?", RED))
491 Disconnect vpp-api, kill vpp and cleanup shared memory files
493 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
495 if cls.vpp.returncode is None:
496 print(double_line_delim)
497 print("VPP or GDB server is still running")
498 print(single_line_delim)
499 input("When done debugging, press ENTER to kill the "
500 "process and finish running the testcase...")
502 # first signal that we want to stop the pump thread, then wake it up
503 if hasattr(cls, 'pump_thread_stop_flag'):
504 cls.pump_thread_stop_flag.set()
505 if hasattr(cls, 'pump_thread_wakeup_pipe'):
506 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
507 if hasattr(cls, 'pump_thread'):
508 cls.logger.debug("Waiting for pump thread to stop")
509 cls.pump_thread.join()
510 if hasattr(cls, 'vpp_stderr_reader_thread'):
511 cls.logger.debug("Waiting for stdderr pump to stop")
512 cls.vpp_stderr_reader_thread.join()
514 if hasattr(cls, 'vpp'):
515 if hasattr(cls, 'vapi'):
516 cls.logger.debug("Disconnecting class vapi client on %s",
518 cls.vapi.disconnect()
519 cls.logger.debug("Deleting class vapi attribute on %s",
523 if cls.vpp.returncode is None:
524 cls.logger.debug("Sending TERM to vpp")
526 cls.logger.debug("Waiting for vpp to die")
527 cls.vpp.communicate()
528 cls.logger.debug("Deleting class vpp attribute on %s",
532 if cls.vpp_startup_failed:
533 stdout_log = cls.logger.info
534 stderr_log = cls.logger.critical
536 stdout_log = cls.logger.info
537 stderr_log = cls.logger.info
539 if hasattr(cls, 'vpp_stdout_deque'):
540 stdout_log(single_line_delim)
541 stdout_log('VPP output to stdout while running %s:', cls.__name__)
542 stdout_log(single_line_delim)
543 vpp_output = "".join(cls.vpp_stdout_deque)
544 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
546 stdout_log('\n%s', vpp_output)
547 stdout_log(single_line_delim)
549 if hasattr(cls, 'vpp_stderr_deque'):
550 stderr_log(single_line_delim)
551 stderr_log('VPP output to stderr while running %s:', cls.__name__)
552 stderr_log(single_line_delim)
553 vpp_output = "".join(cls.vpp_stderr_deque)
554 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
556 stderr_log('\n%s', vpp_output)
557 stderr_log(single_line_delim)
560 def tearDownClass(cls):
561 """ Perform final cleanup after running all tests in this test-case """
562 cls.reporter.send_keep_alive(cls, 'tearDownClass')
564 cls.file_handler.close()
565 cls.reset_packet_infos()
567 debug_internal.on_tear_down_class(cls)
570 """ Show various debug prints after each test """
571 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
572 (self.__class__.__name__, self._testMethodName,
573 self._testMethodDoc))
574 if not self.vpp_dead:
575 self.logger.debug(self.vapi.cli("show trace max 1000"))
576 self.logger.info(self.vapi.ppcli("show interface"))
577 self.logger.info(self.vapi.ppcli("show hardware"))
578 self.logger.info(self.statistics.set_errors_str())
579 self.logger.info(self.vapi.ppcli("show run"))
580 self.logger.info(self.vapi.ppcli("show log"))
581 self.registry.remove_vpp_config(self.logger)
582 # Save/Dump VPP api trace log
583 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
584 tmp_api_trace = "/tmp/%s" % api_trace
585 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
586 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
587 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
589 os.rename(tmp_api_trace, vpp_api_trace_log)
590 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
593 self.registry.unregister_all(self.logger)
596 """ Clear trace before running each test"""
597 super(VppTestCase, self).setUp()
598 self.reporter.send_keep_alive(self)
599 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
600 (self.__class__.__name__, self._testMethodName,
601 self._testMethodDoc))
603 raise Exception("VPP is dead when setting up the test")
604 self.sleep(.1, "during setUp")
605 self.vpp_stdout_deque.append(
606 "--- test setUp() for %s.%s(%s) starts here ---\n" %
607 (self.__class__.__name__, self._testMethodName,
608 self._testMethodDoc))
609 self.vpp_stderr_deque.append(
610 "--- test setUp() for %s.%s(%s) starts here ---\n" %
611 (self.__class__.__name__, self._testMethodName,
612 self._testMethodDoc))
613 self.vapi.cli("clear trace")
614 # store the test instance inside the test class - so that objects
615 # holding the class can access instance methods (like assertEqual)
616 type(self).test_instance = self
619 def pg_enable_capture(cls, interfaces=None):
621 Enable capture on packet-generator interfaces
623 :param interfaces: iterable interface indexes (if None,
624 use self.pg_interfaces)
627 if interfaces is None:
628 interfaces = cls.pg_interfaces
633 def register_capture(cls, cap_name):
634 """ Register a capture in the testclass """
635 # add to the list of captures with current timestamp
636 cls._captures.append((time.time(), cap_name))
637 # filter out from zombies
638 cls._zombie_captures = [(stamp, name)
639 for (stamp, name) in cls._zombie_captures
644 """ Remove any zombie captures and enable the packet generator """
645 # how long before capture is allowed to be deleted - otherwise vpp
646 # crashes - 100ms seems enough (this shouldn't be needed at all)
649 for stamp, cap_name in cls._zombie_captures:
650 wait = stamp + capture_ttl - now
652 cls.sleep(wait, "before deleting capture %s" % cap_name)
654 cls.logger.debug("Removing zombie capture %s" % cap_name)
655 cls.vapi.cli('packet-generator delete %s' % cap_name)
657 cls.vapi.cli("trace add pg-input 1000")
658 cls.vapi.cli('packet-generator enable')
659 cls._zombie_captures = cls._captures
663 def create_pg_interfaces(cls, interfaces):
665 Create packet-generator interfaces.
667 :param interfaces: iterable indexes of the interfaces.
668 :returns: List of created interfaces.
673 intf = VppPGInterface(cls, i)
674 setattr(cls, intf.name, intf)
676 cls.pg_interfaces = result
680 def create_loopback_interfaces(cls, count):
682 Create loopback interfaces.
684 :param count: number of interfaces created.
685 :returns: List of created interfaces.
687 result = [VppLoInterface(cls) for i in range(count)]
689 setattr(cls, intf.name, intf)
690 cls.lo_interfaces = result
694 def extend_packet(packet, size, padding=' '):
696 Extend packet to given size by padding with spaces or custom padding
697 NOTE: Currently works only when Raw layer is present.
699 :param packet: packet
700 :param size: target size
701 :param padding: padding used to extend the payload
704 packet_len = len(packet) + 4
705 extend = size - packet_len
707 num = (extend / len(padding)) + 1
708 packet[Raw].load += (padding * num)[:extend]
711 def reset_packet_infos(cls):
712 """ Reset the list of packet info objects and packet counts to zero """
713 cls._packet_infos = {}
714 cls._packet_count_for_dst_if_idx = {}
717 def create_packet_info(cls, src_if, dst_if):
719 Create packet info object containing the source and destination indexes
720 and add it to the testcase's packet info list
722 :param VppInterface src_if: source interface
723 :param VppInterface dst_if: destination interface
725 :returns: _PacketInfo object
729 info.index = len(cls._packet_infos)
730 info.src = src_if.sw_if_index
731 info.dst = dst_if.sw_if_index
732 if isinstance(dst_if, VppSubInterface):
733 dst_idx = dst_if.parent.sw_if_index
735 dst_idx = dst_if.sw_if_index
736 if dst_idx in cls._packet_count_for_dst_if_idx:
737 cls._packet_count_for_dst_if_idx[dst_idx] += 1
739 cls._packet_count_for_dst_if_idx[dst_idx] = 1
740 cls._packet_infos[info.index] = info
744 def info_to_payload(info):
746 Convert _PacketInfo object to packet payload
748 :param info: _PacketInfo object
750 :returns: string containing serialized data from packet info
752 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
756 def payload_to_info(payload, payload_field='load'):
758 Convert packet payload to _PacketInfo object
760 :param payload: packet payload
761 :type payload: <class 'scapy.packet.Raw'>
762 :param payload_field: packet fieldname of payload "load" for
763 <class 'scapy.packet.Raw'>
764 :type payload_field: str
765 :returns: _PacketInfo object containing de-serialized data from payload
768 numbers = getattr(payload, payload_field).split()
770 info.index = int(numbers[0])
771 info.src = int(numbers[1])
772 info.dst = int(numbers[2])
773 info.ip = int(numbers[3])
774 info.proto = int(numbers[4])
777 def get_next_packet_info(self, info):
779 Iterate over the packet info list stored in the testcase
780 Start iteration with first element if info is None
781 Continue based on index in info if info is specified
783 :param info: info or None
784 :returns: next info in list or None if no more infos
789 next_index = info.index + 1
790 if next_index == len(self._packet_infos):
793 return self._packet_infos[next_index]
795 def get_next_packet_info_for_interface(self, src_index, info):
797 Search the packet info list for the next packet info with same source
800 :param src_index: source interface index to search for
801 :param info: packet info - where to start the search
802 :returns: packet info or None
806 info = self.get_next_packet_info(info)
809 if info.src == src_index:
812 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
814 Search the packet info list for the next packet info with same source
815 and destination interface indexes
817 :param src_index: source interface index to search for
818 :param dst_index: destination interface index to search for
819 :param info: packet info - where to start the search
820 :returns: packet info or None
824 info = self.get_next_packet_info_for_interface(src_index, info)
827 if info.dst == dst_index:
830 def assert_equal(self, real_value, expected_value, name_or_class=None):
831 if name_or_class is None:
832 self.assertEqual(real_value, expected_value)
835 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
836 msg = msg % (getdoc(name_or_class).strip(),
837 real_value, str(name_or_class(real_value)),
838 expected_value, str(name_or_class(expected_value)))
840 msg = "Invalid %s: %s does not match expected value %s" % (
841 name_or_class, real_value, expected_value)
843 self.assertEqual(real_value, expected_value, msg)
845 def assert_in_range(self,
853 msg = "Invalid %s: %s out of range <%s,%s>" % (
854 name, real_value, expected_min, expected_max)
855 self.assertTrue(expected_min <= real_value <= expected_max, msg)
857 def assert_packet_checksums_valid(self, packet,
858 ignore_zero_udp_checksums=True):
859 received = packet.__class__(scapy.compat.raw(packet))
861 ppp("Verifying packet checksums for packet:", received))
862 udp_layers = ['UDP', 'UDPerror']
863 checksum_fields = ['cksum', 'chksum']
866 temp = received.__class__(scapy.compat.raw(received))
868 layer = temp.getlayer(counter)
870 for cf in checksum_fields:
871 if hasattr(layer, cf):
872 if ignore_zero_udp_checksums and \
873 0 == getattr(layer, cf) and \
874 layer.name in udp_layers:
877 checksums.append((counter, cf))
880 counter = counter + 1
881 if 0 == len(checksums):
883 temp = temp.__class__(scapy.compat.raw(temp))
884 for layer, cf in checksums:
885 calc_sum = getattr(temp[layer], cf)
887 getattr(received[layer], cf), calc_sum,
888 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
890 "Checksum field `%s` on `%s` layer has correct value `%s`" %
891 (cf, temp[layer].name, calc_sum))
893 def assert_checksum_valid(self, received_packet, layer,
895 ignore_zero_checksum=False):
896 """ Check checksum of received packet on given layer """
897 received_packet_checksum = getattr(received_packet[layer], field_name)
898 if ignore_zero_checksum and 0 == received_packet_checksum:
900 recalculated = received_packet.__class__(
901 scapy.compat.raw(received_packet))
902 delattr(recalculated[layer], field_name)
903 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
904 self.assert_equal(received_packet_checksum,
905 getattr(recalculated[layer], field_name),
906 "packet checksum on layer: %s" % layer)
908 def assert_ip_checksum_valid(self, received_packet,
909 ignore_zero_checksum=False):
910 self.assert_checksum_valid(received_packet, 'IP',
911 ignore_zero_checksum=ignore_zero_checksum)
913 def assert_tcp_checksum_valid(self, received_packet,
914 ignore_zero_checksum=False):
915 self.assert_checksum_valid(received_packet, 'TCP',
916 ignore_zero_checksum=ignore_zero_checksum)
918 def assert_udp_checksum_valid(self, received_packet,
919 ignore_zero_checksum=True):
920 self.assert_checksum_valid(received_packet, 'UDP',
921 ignore_zero_checksum=ignore_zero_checksum)
923 def assert_embedded_icmp_checksum_valid(self, received_packet):
924 if received_packet.haslayer(IPerror):
925 self.assert_checksum_valid(received_packet, 'IPerror')
926 if received_packet.haslayer(TCPerror):
927 self.assert_checksum_valid(received_packet, 'TCPerror')
928 if received_packet.haslayer(UDPerror):
929 self.assert_checksum_valid(received_packet, 'UDPerror',
930 ignore_zero_checksum=True)
931 if received_packet.haslayer(ICMPerror):
932 self.assert_checksum_valid(received_packet, 'ICMPerror')
934 def assert_icmp_checksum_valid(self, received_packet):
935 self.assert_checksum_valid(received_packet, 'ICMP')
936 self.assert_embedded_icmp_checksum_valid(received_packet)
938 def assert_icmpv6_checksum_valid(self, pkt):
939 if pkt.haslayer(ICMPv6DestUnreach):
940 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
941 self.assert_embedded_icmp_checksum_valid(pkt)
942 if pkt.haslayer(ICMPv6EchoRequest):
943 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
944 if pkt.haslayer(ICMPv6EchoReply):
945 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
947 def assert_packet_counter_equal(self, counter, expected_value):
948 if counter.startswith("/"):
949 counter_value = self.statistics.get_counter(counter)
950 self.assert_equal(counter_value, expected_value,
951 "packet counter `%s'" % counter)
953 counters = self.vapi.cli("sh errors").split('\n')
955 for i in range(1, len(counters) - 1):
956 results = counters[i].split()
957 if results[1] == counter:
958 counter_value = int(results[0])
962 def sleep(cls, timeout, remark=None):
964 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
965 # * by Guido, only the main thread can be interrupted.
967 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
970 if hasattr(os, 'sched_yield'):
976 if hasattr(cls, 'logger'):
977 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
981 if hasattr(cls, 'logger') and after - before > 2 * timeout:
982 cls.logger.error("unexpected self.sleep() result - "
983 "slept for %es instead of ~%es!",
984 after - before, timeout)
985 if hasattr(cls, 'logger'):
987 "Finished sleep (%s) - slept %es (wanted %es)",
988 remark, after - before, timeout)
990 def pg_send(self, intf, pkts):
991 self.vapi.cli("clear trace")
992 intf.add_stream(pkts)
993 self.pg_enable_capture(self.pg_interfaces)
996 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
997 self.pg_send(intf, pkts)
1000 for i in self.pg_interfaces:
1001 i.get_capture(0, timeout=timeout)
1002 i.assert_nothing_captured(remark=remark)
1005 def send_and_expect(self, intf, pkts, output):
1006 self.pg_send(intf, pkts)
1007 rx = output.get_capture(len(pkts))
1010 def send_and_expect_only(self, intf, pkts, output, timeout=None):
1011 self.pg_send(intf, pkts)
1012 rx = output.get_capture(len(pkts))
1016 for i in self.pg_interfaces:
1017 if i not in outputs:
1018 i.get_capture(0, timeout=timeout)
1019 i.assert_nothing_captured()
1025 """ unittest calls runTest when TestCase is instantiated without a
1026 test case. Use case: Writing unittests against VppTestCase"""
1030 def get_testcase_doc_name(test):
1031 return getdoc(test.__class__).splitlines()[0]
1034 def get_test_description(descriptions, test):
1035 short_description = test.shortDescription()
1036 if descriptions and short_description:
1037 return short_description
1042 class TestCaseInfo(object):
1043 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1044 self.logger = logger
1045 self.tempdir = tempdir
1046 self.vpp_pid = vpp_pid
1047 self.vpp_bin_path = vpp_bin_path
1048 self.core_crash_test = None
1051 class VppTestResult(unittest.TestResult):
1053 @property result_string
1054 String variable to store the test case result string.
1056 List variable containing 2-tuples of TestCase instances and strings
1057 holding formatted tracebacks. Each tuple represents a test which
1058 raised an unexpected exception.
1060 List variable containing 2-tuples of TestCase instances and strings
1061 holding formatted tracebacks. Each tuple represents a test where
1062 a failure was explicitly signalled using the TestCase.assert*()
1066 failed_test_cases_info = set()
1067 core_crash_test_cases_info = set()
1068 current_test_case_info = None
1070 def __init__(self, stream=None, descriptions=None, verbosity=None,
1073 :param stream File descriptor to store where to report test results.
1074 Set to the standard error stream by default.
1075 :param descriptions Boolean variable to store information if to use
1076 test case descriptions.
1077 :param verbosity Integer variable to store required verbosity level.
1079 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1080 self.stream = stream
1081 self.descriptions = descriptions
1082 self.verbosity = verbosity
1083 self.result_string = None
1084 self.runner = runner
1086 def addSuccess(self, test):
1088 Record a test succeeded result
1093 if self.current_test_case_info:
1094 self.current_test_case_info.logger.debug(
1095 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1096 test._testMethodName,
1097 test._testMethodDoc))
1098 unittest.TestResult.addSuccess(self, test)
1099 self.result_string = colorize("OK", GREEN)
1101 self.send_result_through_pipe(test, PASS)
1103 def addSkip(self, test, reason):
1105 Record a test skipped.
1111 if self.current_test_case_info:
1112 self.current_test_case_info.logger.debug(
1113 "--- addSkip() %s.%s(%s) called, reason is %s" %
1114 (test.__class__.__name__, test._testMethodName,
1115 test._testMethodDoc, reason))
1116 unittest.TestResult.addSkip(self, test, reason)
1117 self.result_string = colorize("SKIP", YELLOW)
1119 self.send_result_through_pipe(test, SKIP)
1121 def symlink_failed(self):
1122 if self.current_test_case_info:
1124 failed_dir = os.getenv('FAILED_DIR')
1125 link_path = os.path.join(
1128 os.path.basename(self.current_test_case_info.tempdir))
1129 if self.current_test_case_info.logger:
1130 self.current_test_case_info.logger.debug(
1131 "creating a link to the failed test")
1132 self.current_test_case_info.logger.debug(
1133 "os.symlink(%s, %s)" %
1134 (self.current_test_case_info.tempdir, link_path))
1135 if os.path.exists(link_path):
1136 if self.current_test_case_info.logger:
1137 self.current_test_case_info.logger.debug(
1138 'symlink already exists')
1140 os.symlink(self.current_test_case_info.tempdir, link_path)
1142 except Exception as e:
1143 if self.current_test_case_info.logger:
1144 self.current_test_case_info.logger.error(e)
1146 def send_result_through_pipe(self, test, result):
1147 if hasattr(self, 'test_framework_result_pipe'):
1148 pipe = self.test_framework_result_pipe
1150 pipe.send((test.id(), result))
1152 def log_error(self, test, err, fn_name):
1153 if self.current_test_case_info:
1154 if isinstance(test, unittest.suite._ErrorHolder):
1155 test_name = test.description
1157 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1158 test._testMethodName,
1159 test._testMethodDoc)
1160 self.current_test_case_info.logger.debug(
1161 "--- %s() %s called, err is %s" %
1162 (fn_name, test_name, err))
1163 self.current_test_case_info.logger.debug(
1164 "formatted exception is:\n%s" %
1165 "".join(format_exception(*err)))
1167 def add_error(self, test, err, unittest_fn, error_type):
1168 if error_type == FAIL:
1169 self.log_error(test, err, 'addFailure')
1170 error_type_str = colorize("FAIL", RED)
1171 elif error_type == ERROR:
1172 self.log_error(test, err, 'addError')
1173 error_type_str = colorize("ERROR", RED)
1175 raise Exception('Error type %s cannot be used to record an '
1176 'error or a failure' % error_type)
1178 unittest_fn(self, test, err)
1179 if self.current_test_case_info:
1180 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1182 self.current_test_case_info.tempdir)
1183 self.symlink_failed()
1184 self.failed_test_cases_info.add(self.current_test_case_info)
1185 if is_core_present(self.current_test_case_info.tempdir):
1186 if not self.current_test_case_info.core_crash_test:
1187 if isinstance(test, unittest.suite._ErrorHolder):
1188 test_name = str(test)
1190 test_name = "'{!s}' ({!s})".format(
1191 get_testcase_doc_name(test), test.id())
1192 self.current_test_case_info.core_crash_test = test_name
1193 self.core_crash_test_cases_info.add(
1194 self.current_test_case_info)
1196 self.result_string = '%s [no temp dir]' % error_type_str
1198 self.send_result_through_pipe(test, error_type)
1200 def addFailure(self, test, err):
1202 Record a test failed result
1205 :param err: error message
1208 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1210 def addError(self, test, err):
1212 Record a test error result
1215 :param err: error message
1218 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1220 def getDescription(self, test):
1222 Get test description
1225 :returns: test description
1228 return get_test_description(self.descriptions, test)
1230 def startTest(self, test):
1238 def print_header(test):
1239 if not hasattr(test.__class__, '_header_printed'):
1240 print(double_line_delim)
1241 print(colorize(getdoc(test).splitlines()[0], GREEN))
1242 print(double_line_delim)
1243 test.__class__._header_printed = True
1247 unittest.TestResult.startTest(self, test)
1248 if self.verbosity > 0:
1249 self.stream.writeln(
1250 "Starting " + self.getDescription(test) + " ...")
1251 self.stream.writeln(single_line_delim)
1253 def stopTest(self, test):
1255 Called when the given test has been run
1260 unittest.TestResult.stopTest(self, test)
1261 if self.verbosity > 0:
1262 self.stream.writeln(single_line_delim)
1263 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1264 self.result_string))
1265 self.stream.writeln(single_line_delim)
1267 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1268 self.result_string))
1270 self.send_result_through_pipe(test, TEST_RUN)
1272 def printErrors(self):
1274 Print errors from running the test case
1276 if len(self.errors) > 0 or len(self.failures) > 0:
1277 self.stream.writeln()
1278 self.printErrorList('ERROR', self.errors)
1279 self.printErrorList('FAIL', self.failures)
1281 # ^^ that is the last output from unittest before summary
1282 if not self.runner.print_summary:
1283 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1284 self.stream = devnull
1285 self.runner.stream = devnull
1287 def printErrorList(self, flavour, errors):
1289 Print error list to the output stream together with error type
1290 and test case description.
1292 :param flavour: error type
1293 :param errors: iterable errors
1296 for test, err in errors:
1297 self.stream.writeln(double_line_delim)
1298 self.stream.writeln("%s: %s" %
1299 (flavour, self.getDescription(test)))
1300 self.stream.writeln(single_line_delim)
1301 self.stream.writeln("%s" % err)
1304 class VppTestRunner(unittest.TextTestRunner):
1306 A basic test runner implementation which prints results to standard error.
1310 def resultclass(self):
1311 """Class maintaining the results of the tests"""
1312 return VppTestResult
1314 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1315 result_pipe=None, failfast=False, buffer=False,
1316 resultclass=None, print_summary=True, **kwargs):
1317 # ignore stream setting here, use hard-coded stdout to be in sync
1318 # with prints from VppTestCase methods ...
1319 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1320 verbosity, failfast, buffer,
1321 resultclass, **kwargs)
1322 KeepAliveReporter.pipe = keep_alive_pipe
1324 self.orig_stream = self.stream
1325 self.resultclass.test_framework_result_pipe = result_pipe
1327 self.print_summary = print_summary
1329 def _makeResult(self):
1330 return self.resultclass(self.stream,
1335 def run(self, test):
1342 faulthandler.enable() # emit stack trace to stderr if killed by signal
1344 result = super(VppTestRunner, self).run(test)
1345 if not self.print_summary:
1346 self.stream = self.orig_stream
1347 result.stream = self.orig_stream
1351 class Worker(Thread):
1352 def __init__(self, args, logger, env={}):
1353 self.logger = logger
1356 self.env = copy.deepcopy(env)
1357 super(Worker, self).__init__()
1360 executable = self.args[0]
1361 self.logger.debug("Running executable w/args `%s'" % self.args)
1362 env = os.environ.copy()
1363 env.update(self.env)
1364 env["CK_LOG_FILE_NAME"] = "-"
1365 self.process = subprocess.Popen(
1366 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1367 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1368 out, err = self.process.communicate()
1369 self.logger.debug("Finished running `%s'" % executable)
1370 self.logger.info("Return code is `%s'" % self.process.returncode)
1371 self.logger.info(single_line_delim)
1372 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1373 self.logger.info(single_line_delim)
1374 self.logger.info(out)
1375 self.logger.info(single_line_delim)
1376 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1377 self.logger.info(single_line_delim)
1378 self.logger.info(err)
1379 self.logger.info(single_line_delim)
1380 self.result = self.process.returncode
1382 if __name__ == '__main__':