3 from __future__ import print_function
16 from collections import deque
17 from threading import Thread, Event
18 from inspect import getdoc, isclass
19 from traceback import format_exception
20 from logging import FileHandler, DEBUG, Formatter
21 from scapy.packet import Raw
22 from hook import StepHook, PollHook, VppDiedError
23 from vpp_pg_interface import VppPGInterface
24 from vpp_sub_interface import VppSubInterface
25 from vpp_lo_interface import VppLoInterface
26 from vpp_papi_provider import VppPapiProvider
27 from vpp_papi.vpp_stats import VPPStats
28 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
30 from vpp_object import VppObjectRegistry
31 from util import ppp, is_core_present
32 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
33 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
34 from scapy.layers.inet6 import ICMPv6EchoReply
36 if os.name == 'posix' and sys.version_info[0] < 3:
37 # using subprocess32 is recommended by python official documentation
38 # @ https://docs.python.org/2/library/subprocess.html
39 import subprocess32 as subprocess
49 debug_framework = False
50 if os.getenv('TEST_DEBUG', "0") == "1":
51 debug_framework = True
55 Test framework module.
57 The module provides a set of tools for constructing and running tests and
58 representing the results.
62 class _PacketInfo(object):
63 """Private class to create packet info object.
65 Help process information about the next packet.
66 Set variables to default values.
68 #: Store the index of the packet.
70 #: Store the index of the source packet generator interface of the packet.
72 #: Store the index of the destination packet generator interface
75 #: Store expected ip version
77 #: Store expected upper protocol
79 #: Store the copy of the former packet.
82 def __eq__(self, other):
83 index = self.index == other.index
84 src = self.src == other.src
85 dst = self.dst == other.dst
86 data = self.data == other.data
87 return index and src and dst and data
90 def pump_output(testclass):
91 """ pump output from vpp stdout/stderr to proper queues """
94 while not testclass.pump_thread_stop_flag.is_set():
95 readable = select.select([testclass.vpp.stdout.fileno(),
96 testclass.vpp.stderr.fileno(),
97 testclass.pump_thread_wakeup_pipe[0]],
99 if testclass.vpp.stdout.fileno() in readable:
100 read = os.read(testclass.vpp.stdout.fileno(), 102400)
102 split = read.splitlines(True)
103 if len(stdout_fragment) > 0:
104 split[0] = "%s%s" % (stdout_fragment, split[0])
105 if len(split) > 0 and split[-1].endswith("\n"):
109 stdout_fragment = split[-1]
110 testclass.vpp_stdout_deque.extend(split[:limit])
111 if not testclass.cache_vpp_output:
112 for line in split[:limit]:
113 testclass.logger.debug(
114 "VPP STDOUT: %s" % line.rstrip("\n"))
115 if testclass.vpp.stderr.fileno() in readable:
116 read = os.read(testclass.vpp.stderr.fileno(), 102400)
118 split = read.splitlines(True)
119 if len(stderr_fragment) > 0:
120 split[0] = "%s%s" % (stderr_fragment, split[0])
121 if len(split) > 0 and split[-1].endswith(b"\n"):
125 stderr_fragment = split[-1]
126 testclass.vpp_stderr_deque.extend(split[:limit])
127 if not testclass.cache_vpp_output:
128 for line in split[:limit]:
129 testclass.logger.debug(
130 "VPP STDERR: %s" % line.rstrip("\n"))
131 # ignoring the dummy pipe here intentionally - the
132 # flag will take care of properly terminating the loop
135 def _is_skip_aarch64_set():
136 return os.getenv('SKIP_AARCH64', 'n').lower() in ('yes', 'y', '1')
138 is_skip_aarch64_set = _is_skip_aarch64_set()
141 def _is_platform_aarch64():
142 return platform.machine() == 'aarch64'
144 is_platform_aarch64 = _is_platform_aarch64()
147 def _running_extended_tests():
148 s = os.getenv("EXTENDED_TESTS", "n")
149 return True if s.lower() in ("y", "yes", "1") else False
151 running_extended_tests = _running_extended_tests()
154 def _running_on_centos():
155 os_id = os.getenv("OS_ID", "")
156 return True if "centos" in os_id.lower() else False
158 running_on_centos = _running_on_centos
161 class KeepAliveReporter(object):
163 Singleton object which reports test start to parent process
168 self.__dict__ = self._shared_state
176 def pipe(self, pipe):
177 if self._pipe is not None:
178 raise Exception("Internal error - pipe should only be set once.")
181 def send_keep_alive(self, test, desc=None):
183 Write current test tmpdir & desc to keep-alive pipe to signal liveness
185 if self.pipe is None:
186 # if not running forked..
190 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
194 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
197 class VppTestCase(unittest.TestCase):
198 """This subclass is a base class for VPP test cases that are implemented as
199 classes. It provides methods to create and run test case.
202 extra_vpp_punt_config = []
203 extra_vpp_plugin_config = []
206 def packet_infos(self):
207 """List of packet infos"""
208 return self._packet_infos
211 def get_packet_count_for_if_idx(cls, dst_if_index):
212 """Get the number of packet info for specified destination if index"""
213 if dst_if_index in cls._packet_count_for_dst_if_idx:
214 return cls._packet_count_for_dst_if_idx[dst_if_index]
220 """Return the instance of this testcase"""
221 return cls.test_instance
224 def set_debug_flags(cls, d):
225 cls.debug_core = False
226 cls.debug_gdb = False
227 cls.debug_gdbserver = False
232 cls.debug_core = True
235 elif dl == "gdbserver":
236 cls.debug_gdbserver = True
238 raise Exception("Unrecognized DEBUG option: '%s'" % d)
241 def get_least_used_cpu():
242 cpu_usage_list = [set(range(psutil.cpu_count()))]
243 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
244 if 'vpp_main' == p.info['name']]
245 for vpp_process in vpp_processes:
246 for cpu_usage_set in cpu_usage_list:
248 cpu_num = vpp_process.cpu_num()
249 if cpu_num in cpu_usage_set:
250 cpu_usage_set_index = cpu_usage_list.index(
252 if cpu_usage_set_index == len(cpu_usage_list) - 1:
253 cpu_usage_list.append({cpu_num})
255 cpu_usage_list[cpu_usage_set_index + 1].add(
257 cpu_usage_set.remove(cpu_num)
259 except psutil.NoSuchProcess:
262 for cpu_usage_set in cpu_usage_list:
263 if len(cpu_usage_set) > 0:
264 min_usage_set = cpu_usage_set
267 return random.choice(tuple(min_usage_set))
270 def print_header(cls):
271 if not hasattr(cls, '_header_printed'):
272 print(double_line_delim)
273 print(colorize(getdoc(cls).splitlines()[0], GREEN))
274 print(double_line_delim)
275 cls._header_printed = True
278 def setUpConstants(cls):
279 """ Set-up the test case class based on environment variables """
280 s = os.getenv("STEP", "n")
281 cls.step = True if s.lower() in ("y", "yes", "1") else False
282 d = os.getenv("DEBUG", None)
283 c = os.getenv("CACHE_OUTPUT", "1")
284 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
285 cls.set_debug_flags(d)
286 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
287 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
288 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
290 if cls.plugin_path is not None:
291 if cls.extern_plugin_path is not None:
292 plugin_path = "%s:%s" % (
293 cls.plugin_path, cls.extern_plugin_path)
295 plugin_path = cls.plugin_path
296 elif cls.extern_plugin_path is not None:
297 plugin_path = cls.extern_plugin_path
299 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
300 debug_cli = "cli-listen localhost:5002"
302 size = os.getenv("COREDUMP_SIZE")
304 coredump_size = "coredump-size %s" % size
305 if coredump_size is None:
306 coredump_size = "coredump-size unlimited"
308 cpu_core_number = cls.get_least_used_cpu()
310 cls.vpp_cmdline = [cls.vpp_bin, "unix",
311 "{", "nodaemon", debug_cli, "full-coredump",
312 coredump_size, "runtime-dir", cls.tempdir, "}",
313 "api-trace", "{", "on", "}", "api-segment", "{",
314 "prefix", cls.shm_prefix, "}", "cpu", "{",
315 "main-core", str(cpu_core_number), "}", "statseg",
316 "{", "socket-name", cls.stats_sock, "}", "plugins",
317 "{", "plugin", "dpdk_plugin.so", "{", "disable",
318 "}", "plugin", "unittest_plugin.so", "{", "enable",
319 "}"] + cls.extra_vpp_plugin_config + ["}", ]
320 if cls.extra_vpp_punt_config is not None:
321 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
322 if plugin_path is not None:
323 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
324 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
325 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
328 def wait_for_enter(cls):
329 if cls.debug_gdbserver:
330 print(double_line_delim)
331 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
333 print(double_line_delim)
334 print("Spawned VPP with PID: %d" % cls.vpp.pid)
336 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
338 print(single_line_delim)
339 print("You can debug the VPP using e.g.:")
340 if cls.debug_gdbserver:
341 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
342 print("Now is the time to attach a gdb by running the above "
343 "command, set up breakpoints etc. and then resume VPP from "
344 "within gdb by issuing the 'continue' command")
346 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
347 print("Now is the time to attach a gdb by running the above "
348 "command and set up breakpoints etc.")
349 print(single_line_delim)
350 raw_input("Press ENTER to continue running the testcase...")
354 cmdline = cls.vpp_cmdline
356 if cls.debug_gdbserver:
357 gdbserver = '/usr/bin/gdbserver'
358 if not os.path.isfile(gdbserver) or \
359 not os.access(gdbserver, os.X_OK):
360 raise Exception("gdbserver binary '%s' does not exist or is "
361 "not executable" % gdbserver)
363 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
364 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
367 cls.vpp = subprocess.Popen(cmdline,
368 stdout=subprocess.PIPE,
369 stderr=subprocess.PIPE,
371 except subprocess.CalledProcessError as e:
372 cls.logger.critical("Couldn't start vpp: %s" % e)
378 def wait_for_stats_socket(cls):
379 deadline = time.time() + 3
381 while time.time() < deadline or \
382 cls.debug_gdb or cls.debug_gdbserver:
383 if os.path.exists(cls.stats_sock):
388 cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
393 Perform class setup before running the testcase
394 Remove shared memory files, start vpp and connect the vpp-api
396 gc.collect() # run garbage collection first
399 cls.logger = get_logger(cls.__name__)
400 if hasattr(cls, 'parallel_handler'):
401 cls.logger.addHandler(cls.parallel_handler)
402 cls.logger.propagate = False
403 cls.tempdir = tempfile.mkdtemp(
404 prefix='vpp-unittest-%s-' % cls.__name__)
405 cls.stats_sock = "%s/stats.sock" % cls.tempdir
406 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
407 cls.file_handler.setFormatter(
408 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
410 cls.file_handler.setLevel(DEBUG)
411 cls.logger.addHandler(cls.file_handler)
412 cls.shm_prefix = os.path.basename(cls.tempdir)
413 os.chdir(cls.tempdir)
414 cls.logger.info("Temporary dir is %s, shm prefix is %s",
415 cls.tempdir, cls.shm_prefix)
417 cls.reset_packet_infos()
419 cls._zombie_captures = []
422 cls.registry = VppObjectRegistry()
423 cls.vpp_startup_failed = False
424 cls.reporter = KeepAliveReporter()
425 # need to catch exceptions here because if we raise, then the cleanup
426 # doesn't get called and we might end with a zombie vpp
429 cls.reporter.send_keep_alive(cls, 'setUpClass')
430 VppTestResult.current_test_case_info = TestCaseInfo(
431 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
432 cls.vpp_stdout_deque = deque()
433 cls.vpp_stderr_deque = deque()
434 cls.pump_thread_stop_flag = Event()
435 cls.pump_thread_wakeup_pipe = os.pipe()
436 cls.pump_thread = Thread(target=pump_output, args=(cls,))
437 cls.pump_thread.daemon = True
438 cls.pump_thread.start()
439 if cls.debug_gdb or cls.debug_gdbserver:
443 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
449 cls.vapi.register_hook(hook)
450 cls.wait_for_stats_socket()
451 cls.statistics = VPPStats(socketname=cls.stats_sock)
455 cls.vpp_startup_failed = True
457 "VPP died shortly after startup, check the"
458 " output to standard error for possible cause")
464 cls.vapi.disconnect()
467 if cls.debug_gdbserver:
468 print(colorize("You're running VPP inside gdbserver but "
469 "VPP-API connection failed, did you forget "
470 "to 'continue' VPP from within gdb?", RED))
482 Disconnect vpp-api, kill vpp and cleanup shared memory files
484 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
486 if cls.vpp.returncode is None:
487 print(double_line_delim)
488 print("VPP or GDB server is still running")
489 print(single_line_delim)
490 raw_input("When done debugging, press ENTER to kill the "
491 "process and finish running the testcase...")
493 # first signal that we want to stop the pump thread, then wake it up
494 if hasattr(cls, 'pump_thread_stop_flag'):
495 cls.pump_thread_stop_flag.set()
496 if hasattr(cls, 'pump_thread_wakeup_pipe'):
497 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
498 if hasattr(cls, 'pump_thread'):
499 cls.logger.debug("Waiting for pump thread to stop")
500 cls.pump_thread.join()
501 if hasattr(cls, 'vpp_stderr_reader_thread'):
502 cls.logger.debug("Waiting for stdderr pump to stop")
503 cls.vpp_stderr_reader_thread.join()
505 if hasattr(cls, 'vpp'):
506 if hasattr(cls, 'vapi'):
507 cls.vapi.disconnect()
510 if cls.vpp.returncode is None:
511 cls.logger.debug("Sending TERM to vpp")
513 cls.logger.debug("Waiting for vpp to die")
514 cls.vpp.communicate()
517 if cls.vpp_startup_failed:
518 stdout_log = cls.logger.info
519 stderr_log = cls.logger.critical
521 stdout_log = cls.logger.info
522 stderr_log = cls.logger.info
524 if hasattr(cls, 'vpp_stdout_deque'):
525 stdout_log(single_line_delim)
526 stdout_log('VPP output to stdout while running %s:', cls.__name__)
527 stdout_log(single_line_delim)
528 vpp_output = "".join(cls.vpp_stdout_deque)
529 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
531 stdout_log('\n%s', vpp_output)
532 stdout_log(single_line_delim)
534 if hasattr(cls, 'vpp_stderr_deque'):
535 stderr_log(single_line_delim)
536 stderr_log('VPP output to stderr while running %s:', cls.__name__)
537 stderr_log(single_line_delim)
538 vpp_output = "".join(str(cls.vpp_stderr_deque))
539 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
541 stderr_log('\n%s', vpp_output)
542 stderr_log(single_line_delim)
545 def tearDownClass(cls):
546 """ Perform final cleanup after running all tests in this test-case """
547 cls.reporter.send_keep_alive(cls, 'tearDownClass')
549 cls.file_handler.close()
550 cls.reset_packet_infos()
552 debug_internal.on_tear_down_class(cls)
555 """ Show various debug prints after each test """
556 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
557 (self.__class__.__name__, self._testMethodName,
558 self._testMethodDoc))
559 if not self.vpp_dead:
560 self.logger.debug(self.vapi.cli("show trace"))
561 self.logger.info(self.vapi.ppcli("show interface"))
562 self.logger.info(self.vapi.ppcli("show hardware"))
563 self.logger.info(self.statistics.set_errors_str())
564 self.logger.info(self.vapi.ppcli("show run"))
565 self.logger.info(self.vapi.ppcli("show log"))
566 self.registry.remove_vpp_config(self.logger)
567 # Save/Dump VPP api trace log
568 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
569 tmp_api_trace = "/tmp/%s" % api_trace
570 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
571 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
572 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
574 os.rename(tmp_api_trace, vpp_api_trace_log)
575 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
578 self.registry.unregister_all(self.logger)
581 """ Clear trace before running each test"""
582 self.reporter.send_keep_alive(self)
583 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
584 (self.__class__.__name__, self._testMethodName,
585 self._testMethodDoc))
587 raise Exception("VPP is dead when setting up the test")
588 self.sleep(.1, "during setUp")
589 self.vpp_stdout_deque.append(
590 "--- test setUp() for %s.%s(%s) starts here ---\n" %
591 (self.__class__.__name__, self._testMethodName,
592 self._testMethodDoc))
593 self.vpp_stderr_deque.append(
594 "--- test setUp() for %s.%s(%s) starts here ---\n" %
595 (self.__class__.__name__, self._testMethodName,
596 self._testMethodDoc))
597 self.vapi.cli("clear trace")
598 # store the test instance inside the test class - so that objects
599 # holding the class can access instance methods (like assertEqual)
600 type(self).test_instance = self
603 def pg_enable_capture(cls, interfaces=None):
605 Enable capture on packet-generator interfaces
607 :param interfaces: iterable interface indexes (if None,
608 use self.pg_interfaces)
611 if interfaces is None:
612 interfaces = cls.pg_interfaces
617 def register_capture(cls, cap_name):
618 """ Register a capture in the testclass """
619 # add to the list of captures with current timestamp
620 cls._captures.append((time.time(), cap_name))
621 # filter out from zombies
622 cls._zombie_captures = [(stamp, name)
623 for (stamp, name) in cls._zombie_captures
628 """ Remove any zombie captures and enable the packet generator """
629 # how long before capture is allowed to be deleted - otherwise vpp
630 # crashes - 100ms seems enough (this shouldn't be needed at all)
633 for stamp, cap_name in cls._zombie_captures:
634 wait = stamp + capture_ttl - now
636 cls.sleep(wait, "before deleting capture %s" % cap_name)
638 cls.logger.debug("Removing zombie capture %s" % cap_name)
639 cls.vapi.cli('packet-generator delete %s' % cap_name)
641 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
642 cls.vapi.cli('packet-generator enable')
643 cls._zombie_captures = cls._captures
647 def create_pg_interfaces(cls, interfaces):
649 Create packet-generator interfaces.
651 :param interfaces: iterable indexes of the interfaces.
652 :returns: List of created interfaces.
657 intf = VppPGInterface(cls, i)
658 setattr(cls, intf.name, intf)
660 cls.pg_interfaces = result
664 def create_loopback_interfaces(cls, count):
666 Create loopback interfaces.
668 :param count: number of interfaces created.
669 :returns: List of created interfaces.
671 result = [VppLoInterface(cls) for i in range(count)]
673 setattr(cls, intf.name, intf)
674 cls.lo_interfaces = result
678 def extend_packet(packet, size, padding=' '):
680 Extend packet to given size by padding with spaces or custom padding
681 NOTE: Currently works only when Raw layer is present.
683 :param packet: packet
684 :param size: target size
685 :param padding: padding used to extend the payload
688 packet_len = len(packet) + 4
689 extend = size - packet_len
691 num = (extend / len(padding)) + 1
692 packet[Raw].load += (padding * num)[:extend]
695 def reset_packet_infos(cls):
696 """ Reset the list of packet info objects and packet counts to zero """
697 cls._packet_infos = {}
698 cls._packet_count_for_dst_if_idx = {}
701 def create_packet_info(cls, src_if, dst_if):
703 Create packet info object containing the source and destination indexes
704 and add it to the testcase's packet info list
706 :param VppInterface src_if: source interface
707 :param VppInterface dst_if: destination interface
709 :returns: _PacketInfo object
713 info.index = len(cls._packet_infos)
714 info.src = src_if.sw_if_index
715 info.dst = dst_if.sw_if_index
716 if isinstance(dst_if, VppSubInterface):
717 dst_idx = dst_if.parent.sw_if_index
719 dst_idx = dst_if.sw_if_index
720 if dst_idx in cls._packet_count_for_dst_if_idx:
721 cls._packet_count_for_dst_if_idx[dst_idx] += 1
723 cls._packet_count_for_dst_if_idx[dst_idx] = 1
724 cls._packet_infos[info.index] = info
728 def info_to_payload(info):
730 Convert _PacketInfo object to packet payload
732 :param info: _PacketInfo object
734 :returns: string containing serialized data from packet info
736 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
740 def payload_to_info(payload):
742 Convert packet payload to _PacketInfo object
744 :param payload: packet payload
746 :returns: _PacketInfo object containing de-serialized data from payload
749 numbers = payload.split()
751 info.index = int(numbers[0])
752 info.src = int(numbers[1])
753 info.dst = int(numbers[2])
754 info.ip = int(numbers[3])
755 info.proto = int(numbers[4])
758 def get_next_packet_info(self, info):
760 Iterate over the packet info list stored in the testcase
761 Start iteration with first element if info is None
762 Continue based on index in info if info is specified
764 :param info: info or None
765 :returns: next info in list or None if no more infos
770 next_index = info.index + 1
771 if next_index == len(self._packet_infos):
774 return self._packet_infos[next_index]
776 def get_next_packet_info_for_interface(self, src_index, info):
778 Search the packet info list for the next packet info with same source
781 :param src_index: source interface index to search for
782 :param info: packet info - where to start the search
783 :returns: packet info or None
787 info = self.get_next_packet_info(info)
790 if info.src == src_index:
793 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
795 Search the packet info list for the next packet info with same source
796 and destination interface indexes
798 :param src_index: source interface index to search for
799 :param dst_index: destination interface index to search for
800 :param info: packet info - where to start the search
801 :returns: packet info or None
805 info = self.get_next_packet_info_for_interface(src_index, info)
808 if info.dst == dst_index:
811 def assert_equal(self, real_value, expected_value, name_or_class=None):
812 if name_or_class is None:
813 self.assertEqual(real_value, expected_value)
816 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
817 msg = msg % (getdoc(name_or_class).strip(),
818 real_value, str(name_or_class(real_value)),
819 expected_value, str(name_or_class(expected_value)))
821 msg = "Invalid %s: %s does not match expected value %s" % (
822 name_or_class, real_value, expected_value)
824 self.assertEqual(real_value, expected_value, msg)
826 def assert_in_range(self,
834 msg = "Invalid %s: %s out of range <%s,%s>" % (
835 name, real_value, expected_min, expected_max)
836 self.assertTrue(expected_min <= real_value <= expected_max, msg)
838 def assert_packet_checksums_valid(self, packet,
839 ignore_zero_udp_checksums=True):
840 received = packet.__class__(str(packet))
842 ppp("Verifying packet checksums for packet:", received))
843 udp_layers = ['UDP', 'UDPerror']
844 checksum_fields = ['cksum', 'chksum']
847 temp = received.__class__(str(received))
849 layer = temp.getlayer(counter)
851 for cf in checksum_fields:
852 if hasattr(layer, cf):
853 if ignore_zero_udp_checksums and \
854 0 == getattr(layer, cf) and \
855 layer.name in udp_layers:
858 checksums.append((counter, cf))
861 counter = counter + 1
862 if 0 == len(checksums):
864 temp = temp.__class__(str(temp))
865 for layer, cf in checksums:
866 calc_sum = getattr(temp[layer], cf)
868 getattr(received[layer], cf), calc_sum,
869 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
871 "Checksum field `%s` on `%s` layer has correct value `%s`" %
872 (cf, temp[layer].name, calc_sum))
874 def assert_checksum_valid(self, received_packet, layer,
876 ignore_zero_checksum=False):
877 """ Check checksum of received packet on given layer """
878 received_packet_checksum = getattr(received_packet[layer], field_name)
879 if ignore_zero_checksum and 0 == received_packet_checksum:
881 recalculated = received_packet.__class__(str(received_packet))
882 delattr(recalculated[layer], field_name)
883 recalculated = recalculated.__class__(str(recalculated))
884 self.assert_equal(received_packet_checksum,
885 getattr(recalculated[layer], field_name),
886 "packet checksum on layer: %s" % layer)
888 def assert_ip_checksum_valid(self, received_packet,
889 ignore_zero_checksum=False):
890 self.assert_checksum_valid(received_packet, 'IP',
891 ignore_zero_checksum=ignore_zero_checksum)
893 def assert_tcp_checksum_valid(self, received_packet,
894 ignore_zero_checksum=False):
895 self.assert_checksum_valid(received_packet, 'TCP',
896 ignore_zero_checksum=ignore_zero_checksum)
898 def assert_udp_checksum_valid(self, received_packet,
899 ignore_zero_checksum=True):
900 self.assert_checksum_valid(received_packet, 'UDP',
901 ignore_zero_checksum=ignore_zero_checksum)
903 def assert_embedded_icmp_checksum_valid(self, received_packet):
904 if received_packet.haslayer(IPerror):
905 self.assert_checksum_valid(received_packet, 'IPerror')
906 if received_packet.haslayer(TCPerror):
907 self.assert_checksum_valid(received_packet, 'TCPerror')
908 if received_packet.haslayer(UDPerror):
909 self.assert_checksum_valid(received_packet, 'UDPerror',
910 ignore_zero_checksum=True)
911 if received_packet.haslayer(ICMPerror):
912 self.assert_checksum_valid(received_packet, 'ICMPerror')
914 def assert_icmp_checksum_valid(self, received_packet):
915 self.assert_checksum_valid(received_packet, 'ICMP')
916 self.assert_embedded_icmp_checksum_valid(received_packet)
918 def assert_icmpv6_checksum_valid(self, pkt):
919 if pkt.haslayer(ICMPv6DestUnreach):
920 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
921 self.assert_embedded_icmp_checksum_valid(pkt)
922 if pkt.haslayer(ICMPv6EchoRequest):
923 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
924 if pkt.haslayer(ICMPv6EchoReply):
925 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
927 def assert_packet_counter_equal(self, counter, expected_value):
928 if counter.startswith("/"):
929 counter_value = self.statistics.get_counter(counter)
930 self.assert_equal(counter_value, expected_value,
931 "packet counter `%s'" % counter)
933 counters = self.vapi.cli("sh errors").split('\n')
935 for i in range(1, len(counters) - 1):
936 results = counters[i].split()
937 if results[1] == counter:
938 counter_value = int(results[0])
942 def sleep(cls, timeout, remark=None):
943 if hasattr(cls, 'logger'):
944 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
948 if hasattr(cls, 'logger') and after - before > 2 * timeout:
949 cls.logger.error("unexpected time.sleep() result - "
950 "slept for %es instead of ~%es!",
951 after - before, timeout)
952 if hasattr(cls, 'logger'):
954 "Finished sleep (%s) - slept %es (wanted %es)",
955 remark, after - before, timeout)
957 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
958 self.vapi.cli("clear trace")
959 intf.add_stream(pkts)
960 self.pg_enable_capture(self.pg_interfaces)
964 for i in self.pg_interfaces:
965 i.get_capture(0, timeout=timeout)
966 i.assert_nothing_captured(remark=remark)
969 def send_and_expect(self, input, pkts, output):
970 self.vapi.cli("clear trace")
971 input.add_stream(pkts)
972 self.pg_enable_capture(self.pg_interfaces)
974 if isinstance(object, (list,)):
977 rx.append(output.get_capture(len(pkts)))
979 rx = output.get_capture(len(pkts))
982 def send_and_expect_only(self, input, pkts, output, timeout=None):
983 self.vapi.cli("clear trace")
984 input.add_stream(pkts)
985 self.pg_enable_capture(self.pg_interfaces)
987 if isinstance(object, (list,)):
991 rx.append(output.get_capture(len(pkts)))
993 rx = output.get_capture(len(pkts))
997 for i in self.pg_interfaces:
999 i.get_capture(0, timeout=timeout)
1000 i.assert_nothing_captured()
1006 def get_testcase_doc_name(test):
1007 return getdoc(test.__class__).splitlines()[0]
1010 def get_test_description(descriptions, test):
1011 short_description = test.shortDescription()
1012 if descriptions and short_description:
1013 return short_description
1018 class TestCaseInfo(object):
1019 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1020 self.logger = logger
1021 self.tempdir = tempdir
1022 self.vpp_pid = vpp_pid
1023 self.vpp_bin_path = vpp_bin_path
1024 self.core_crash_test = None
1027 class VppTestResult(unittest.TestResult):
1029 @property result_string
1030 String variable to store the test case result string.
1032 List variable containing 2-tuples of TestCase instances and strings
1033 holding formatted tracebacks. Each tuple represents a test which
1034 raised an unexpected exception.
1036 List variable containing 2-tuples of TestCase instances and strings
1037 holding formatted tracebacks. Each tuple represents a test where
1038 a failure was explicitly signalled using the TestCase.assert*()
1042 failed_test_cases_info = set()
1043 core_crash_test_cases_info = set()
1044 current_test_case_info = None
1046 def __init__(self, stream, descriptions, verbosity, runner):
1048 :param stream File descriptor to store where to report test results.
1049 Set to the standard error stream by default.
1050 :param descriptions Boolean variable to store information if to use
1051 test case descriptions.
1052 :param verbosity Integer variable to store required verbosity level.
1054 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
1055 self.stream = stream
1056 self.descriptions = descriptions
1057 self.verbosity = verbosity
1058 self.result_string = None
1059 self.runner = runner
1061 def addSuccess(self, test):
1063 Record a test succeeded result
1068 if self.current_test_case_info:
1069 self.current_test_case_info.logger.debug(
1070 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1071 test._testMethodName,
1072 test._testMethodDoc))
1073 unittest.TestResult.addSuccess(self, test)
1074 self.result_string = colorize("OK", GREEN)
1076 self.send_result_through_pipe(test, PASS)
1078 def addSkip(self, test, reason):
1080 Record a test skipped.
1086 if self.current_test_case_info:
1087 self.current_test_case_info.logger.debug(
1088 "--- addSkip() %s.%s(%s) called, reason is %s" %
1089 (test.__class__.__name__, test._testMethodName,
1090 test._testMethodDoc, reason))
1091 unittest.TestResult.addSkip(self, test, reason)
1092 self.result_string = colorize("SKIP", YELLOW)
1094 self.send_result_through_pipe(test, SKIP)
1096 def symlink_failed(self):
1097 if self.current_test_case_info:
1099 failed_dir = os.getenv('FAILED_DIR')
1100 link_path = os.path.join(
1103 os.path.basename(self.current_test_case_info.tempdir))
1104 if self.current_test_case_info.logger:
1105 self.current_test_case_info.logger.debug(
1106 "creating a link to the failed test")
1107 self.current_test_case_info.logger.debug(
1108 "os.symlink(%s, %s)" %
1109 (self.current_test_case_info.tempdir, link_path))
1110 if os.path.exists(link_path):
1111 if self.current_test_case_info.logger:
1112 self.current_test_case_info.logger.debug(
1113 'symlink already exists')
1115 os.symlink(self.current_test_case_info.tempdir, link_path)
1117 except Exception as e:
1118 if self.current_test_case_info.logger:
1119 self.current_test_case_info.logger.error(e)
1121 def send_result_through_pipe(self, test, result):
1122 if hasattr(self, 'test_framework_result_pipe'):
1123 pipe = self.test_framework_result_pipe
1125 pipe.send((test.id(), result))
1127 def log_error(self, test, err, fn_name):
1128 if self.current_test_case_info:
1129 if isinstance(test, unittest.suite._ErrorHolder):
1130 test_name = test.description
1132 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1133 test._testMethodName,
1134 test._testMethodDoc)
1135 self.current_test_case_info.logger.debug(
1136 "--- %s() %s called, err is %s" %
1137 (fn_name, test_name, err))
1138 self.current_test_case_info.logger.debug(
1139 "formatted exception is:\n%s" %
1140 "".join(format_exception(*err)))
1142 def add_error(self, test, err, unittest_fn, error_type):
1143 if error_type == FAIL:
1144 self.log_error(test, err, 'addFailure')
1145 error_type_str = colorize("FAIL", RED)
1146 elif error_type == ERROR:
1147 self.log_error(test, err, 'addError')
1148 error_type_str = colorize("ERROR", RED)
1150 raise Exception('Error type %s cannot be used to record an '
1151 'error or a failure' % error_type)
1153 unittest_fn(self, test, err)
1154 if self.current_test_case_info:
1155 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1157 self.current_test_case_info.tempdir)
1158 self.symlink_failed()
1159 self.failed_test_cases_info.add(self.current_test_case_info)
1160 if is_core_present(self.current_test_case_info.tempdir):
1161 if not self.current_test_case_info.core_crash_test:
1162 if isinstance(test, unittest.suite._ErrorHolder):
1163 test_name = str(test)
1165 test_name = "'{}' ({})".format(
1166 get_testcase_doc_name(test), test.id())
1167 self.current_test_case_info.core_crash_test = test_name
1168 self.core_crash_test_cases_info.add(
1169 self.current_test_case_info)
1171 self.result_string = '%s [no temp dir]' % error_type_str
1173 self.send_result_through_pipe(test, error_type)
1175 def addFailure(self, test, err):
1177 Record a test failed result
1180 :param err: error message
1183 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1185 def addError(self, test, err):
1187 Record a test error result
1190 :param err: error message
1193 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1195 def getDescription(self, test):
1197 Get test description
1200 :returns: test description
1203 return get_test_description(self.descriptions, test)
1205 def startTest(self, test):
1214 unittest.TestResult.startTest(self, test)
1215 if self.verbosity > 0:
1216 self.stream.writeln(
1217 "Starting " + self.getDescription(test) + " ...")
1218 self.stream.writeln(single_line_delim)
1220 def stopTest(self, test):
1222 Called when the given test has been run
1227 unittest.TestResult.stopTest(self, test)
1228 if self.verbosity > 0:
1229 self.stream.writeln(single_line_delim)
1230 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1231 self.result_string))
1232 self.stream.writeln(single_line_delim)
1234 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1235 self.result_string))
1237 self.send_result_through_pipe(test, TEST_RUN)
1239 def printErrors(self):
1241 Print errors from running the test case
1243 if len(self.errors) > 0 or len(self.failures) > 0:
1244 self.stream.writeln()
1245 self.printErrorList('ERROR', self.errors)
1246 self.printErrorList('FAIL', self.failures)
1248 # ^^ that is the last output from unittest before summary
1249 if not self.runner.print_summary:
1250 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1251 self.stream = devnull
1252 self.runner.stream = devnull
1254 def printErrorList(self, flavour, errors):
1256 Print error list to the output stream together with error type
1257 and test case description.
1259 :param flavour: error type
1260 :param errors: iterable errors
1263 for test, err in errors:
1264 self.stream.writeln(double_line_delim)
1265 self.stream.writeln("%s: %s" %
1266 (flavour, self.getDescription(test)))
1267 self.stream.writeln(single_line_delim)
1268 self.stream.writeln("%s" % err)
1271 class VppTestRunner(unittest.TextTestRunner):
1273 A basic test runner implementation which prints results to standard error.
1277 def resultclass(self):
1278 """Class maintaining the results of the tests"""
1279 return VppTestResult
1281 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1282 result_pipe=None, failfast=False, buffer=False,
1283 resultclass=None, print_summary=True):
1284 # ignore stream setting here, use hard-coded stdout to be in sync
1285 # with prints from VppTestCase methods ...
1286 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1287 verbosity, failfast, buffer,
1289 KeepAliveReporter.pipe = keep_alive_pipe
1291 self.orig_stream = self.stream
1292 self.resultclass.test_framework_result_pipe = result_pipe
1294 self.print_summary = print_summary
1296 def _makeResult(self):
1297 return self.resultclass(self.stream,
1302 def run(self, test):
1309 faulthandler.enable() # emit stack trace to stderr if killed by signal
1311 result = super(VppTestRunner, self).run(test)
1312 if not self.print_summary:
1313 self.stream = self.orig_stream
1314 result.stream = self.orig_stream
1318 class Worker(Thread):
1319 def __init__(self, args, logger, env={}):
1320 self.logger = logger
1323 self.env = copy.deepcopy(env)
1324 super(Worker, self).__init__()
1327 executable = self.args[0]
1328 self.logger.debug("Running executable w/args `%s'" % self.args)
1329 env = os.environ.copy()
1330 env.update(self.env)
1331 env["CK_LOG_FILE_NAME"] = "-"
1332 self.process = subprocess.Popen(
1333 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1334 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1335 out, err = self.process.communicate()
1336 self.logger.debug("Finished running `%s'" % executable)
1337 self.logger.info("Return code is `%s'" % self.process.returncode)
1338 self.logger.info(single_line_delim)
1339 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1340 self.logger.info(single_line_delim)
1341 self.logger.info(out)
1342 self.logger.info(single_line_delim)
1343 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1344 self.logger.info(single_line_delim)
1345 self.logger.info(err)
1346 self.logger.info(single_line_delim)
1347 self.result = self.process.returncode