3 from __future__ import print_function
16 from collections import deque
17 from threading import Thread, Event
18 from inspect import getdoc, isclass
19 from traceback import format_exception
20 from logging import FileHandler, DEBUG, Formatter
21 from scapy.packet import Raw
22 from hook import StepHook, PollHook, VppDiedError
23 from vpp_pg_interface import VppPGInterface
24 from vpp_sub_interface import VppSubInterface
25 from vpp_lo_interface import VppLoInterface
26 from vpp_papi_provider import VppPapiProvider
27 from vpp_papi.vpp_stats import VPPStats
28 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
30 from vpp_object import VppObjectRegistry
31 from util import ppp, is_core_present
32 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
33 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
34 from scapy.layers.inet6 import ICMPv6EchoReply
36 if os.name == 'posix' and sys.version_info[0] < 3:
37 # using subprocess32 is recommended by python official documentation
38 # @ https://docs.python.org/2/library/subprocess.html
39 import subprocess32 as subprocess
43 # Python2/3 compatible
55 debug_framework = False
56 if os.getenv('TEST_DEBUG', "0") == "1":
57 debug_framework = True
61 Test framework module.
63 The module provides a set of tools for constructing and running tests and
64 representing the results.
68 class _PacketInfo(object):
69 """Private class to create packet info object.
71 Help process information about the next packet.
72 Set variables to default values.
74 #: Store the index of the packet.
76 #: Store the index of the source packet generator interface of the packet.
78 #: Store the index of the destination packet generator interface
81 #: Store expected ip version
83 #: Store expected upper protocol
85 #: Store the copy of the former packet.
88 def __eq__(self, other):
89 index = self.index == other.index
90 src = self.src == other.src
91 dst = self.dst == other.dst
92 data = self.data == other.data
93 return index and src and dst and data
96 def pump_output(testclass):
97 """ pump output from vpp stdout/stderr to proper queues """
100 while not testclass.pump_thread_stop_flag.is_set():
101 readable = select.select([testclass.vpp.stdout.fileno(),
102 testclass.vpp.stderr.fileno(),
103 testclass.pump_thread_wakeup_pipe[0]],
105 if testclass.vpp.stdout.fileno() in readable:
106 read = os.read(testclass.vpp.stdout.fileno(), 102400)
108 split = read.splitlines(True)
109 if len(stdout_fragment) > 0:
110 split[0] = "%s%s" % (stdout_fragment, split[0])
111 if len(split) > 0 and split[-1].endswith("\n"):
115 stdout_fragment = split[-1]
116 testclass.vpp_stdout_deque.extend(split[:limit])
117 if not testclass.cache_vpp_output:
118 for line in split[:limit]:
119 testclass.logger.debug(
120 "VPP STDOUT: %s" % line.rstrip("\n"))
121 if testclass.vpp.stderr.fileno() in readable:
122 read = os.read(testclass.vpp.stderr.fileno(), 102400)
124 split = read.splitlines(True)
125 if len(stderr_fragment) > 0:
126 split[0] = "%s%s" % (stderr_fragment, split[0])
127 if len(split) > 0 and split[-1].endswith(b"\n"):
131 stderr_fragment = split[-1]
132 testclass.vpp_stderr_deque.extend(split[:limit])
133 if not testclass.cache_vpp_output:
134 for line in split[:limit]:
135 testclass.logger.debug(
136 "VPP STDERR: %s" % line.rstrip("\n"))
137 # ignoring the dummy pipe here intentionally - the
138 # flag will take care of properly terminating the loop
141 def _is_skip_aarch64_set():
142 return os.getenv('SKIP_AARCH64', 'n').lower() in ('yes', 'y', '1')
144 is_skip_aarch64_set = _is_skip_aarch64_set()
147 def _is_platform_aarch64():
148 return platform.machine() == 'aarch64'
150 is_platform_aarch64 = _is_platform_aarch64()
153 def _running_extended_tests():
154 s = os.getenv("EXTENDED_TESTS", "n")
155 return True if s.lower() in ("y", "yes", "1") else False
157 running_extended_tests = _running_extended_tests()
160 def _running_on_centos():
161 os_id = os.getenv("OS_ID", "")
162 return True if "centos" in os_id.lower() else False
164 running_on_centos = _running_on_centos
167 class KeepAliveReporter(object):
169 Singleton object which reports test start to parent process
174 self.__dict__ = self._shared_state
182 def pipe(self, pipe):
183 if self._pipe is not None:
184 raise Exception("Internal error - pipe should only be set once.")
187 def send_keep_alive(self, test, desc=None):
189 Write current test tmpdir & desc to keep-alive pipe to signal liveness
191 if self.pipe is None:
192 # if not running forked..
196 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
200 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
203 class VppTestCase(unittest.TestCase):
204 """This subclass is a base class for VPP test cases that are implemented as
205 classes. It provides methods to create and run test case.
208 extra_vpp_punt_config = []
209 extra_vpp_plugin_config = []
212 def packet_infos(self):
213 """List of packet infos"""
214 return self._packet_infos
217 def get_packet_count_for_if_idx(cls, dst_if_index):
218 """Get the number of packet info for specified destination if index"""
219 if dst_if_index in cls._packet_count_for_dst_if_idx:
220 return cls._packet_count_for_dst_if_idx[dst_if_index]
226 """Return the instance of this testcase"""
227 return cls.test_instance
230 def set_debug_flags(cls, d):
231 cls.debug_core = False
232 cls.debug_gdb = False
233 cls.debug_gdbserver = False
238 cls.debug_core = True
241 elif dl == "gdbserver":
242 cls.debug_gdbserver = True
244 raise Exception("Unrecognized DEBUG option: '%s'" % d)
247 def get_least_used_cpu():
248 cpu_usage_list = [set(range(psutil.cpu_count()))]
249 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
250 if 'vpp_main' == p.info['name']]
251 for vpp_process in vpp_processes:
252 for cpu_usage_set in cpu_usage_list:
254 cpu_num = vpp_process.cpu_num()
255 if cpu_num in cpu_usage_set:
256 cpu_usage_set_index = cpu_usage_list.index(
258 if cpu_usage_set_index == len(cpu_usage_list) - 1:
259 cpu_usage_list.append({cpu_num})
261 cpu_usage_list[cpu_usage_set_index + 1].add(
263 cpu_usage_set.remove(cpu_num)
265 except psutil.NoSuchProcess:
268 for cpu_usage_set in cpu_usage_list:
269 if len(cpu_usage_set) > 0:
270 min_usage_set = cpu_usage_set
273 return random.choice(tuple(min_usage_set))
276 def setUpConstants(cls):
277 """ Set-up the test case class based on environment variables """
278 s = os.getenv("STEP", "n")
279 cls.step = True if s.lower() in ("y", "yes", "1") else False
280 d = os.getenv("DEBUG", None)
281 c = os.getenv("CACHE_OUTPUT", "1")
282 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
283 cls.set_debug_flags(d)
284 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
285 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
286 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
288 if cls.plugin_path is not None:
289 if cls.extern_plugin_path is not None:
290 plugin_path = "%s:%s" % (
291 cls.plugin_path, cls.extern_plugin_path)
293 plugin_path = cls.plugin_path
294 elif cls.extern_plugin_path is not None:
295 plugin_path = cls.extern_plugin_path
297 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
298 debug_cli = "cli-listen localhost:5002"
300 size = os.getenv("COREDUMP_SIZE")
302 coredump_size = "coredump-size %s" % size
303 if coredump_size is None:
304 coredump_size = "coredump-size unlimited"
306 cpu_core_number = cls.get_least_used_cpu()
308 cls.vpp_cmdline = [cls.vpp_bin, "unix",
309 "{", "nodaemon", debug_cli, "full-coredump",
310 coredump_size, "runtime-dir", cls.tempdir, "}",
311 "api-trace", "{", "on", "}", "api-segment", "{",
312 "prefix", cls.shm_prefix, "}", "cpu", "{",
313 "main-core", str(cpu_core_number), "}", "statseg",
314 "{", "socket-name", cls.stats_sock, "}", "plugins",
315 "{", "plugin", "dpdk_plugin.so", "{", "disable",
316 "}", "plugin", "unittest_plugin.so", "{", "enable",
317 "}"] + cls.extra_vpp_plugin_config + ["}", ]
318 if cls.extra_vpp_punt_config is not None:
319 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
320 if plugin_path is not None:
321 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
322 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
323 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
326 def wait_for_enter(cls):
327 if cls.debug_gdbserver:
328 print(double_line_delim)
329 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
331 print(double_line_delim)
332 print("Spawned VPP with PID: %d" % cls.vpp.pid)
334 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
336 print(single_line_delim)
337 print("You can debug the VPP using e.g.:")
338 if cls.debug_gdbserver:
339 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
340 print("Now is the time to attach a gdb by running the above "
341 "command, set up breakpoints etc. and then resume VPP from "
342 "within gdb by issuing the 'continue' command")
344 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
345 print("Now is the time to attach a gdb by running the above "
346 "command and set up breakpoints etc.")
347 print(single_line_delim)
348 input("Press ENTER to continue running the testcase...")
352 cmdline = cls.vpp_cmdline
354 if cls.debug_gdbserver:
355 gdbserver = '/usr/bin/gdbserver'
356 if not os.path.isfile(gdbserver) or \
357 not os.access(gdbserver, os.X_OK):
358 raise Exception("gdbserver binary '%s' does not exist or is "
359 "not executable" % gdbserver)
361 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
362 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
365 cls.vpp = subprocess.Popen(cmdline,
366 stdout=subprocess.PIPE,
367 stderr=subprocess.PIPE,
369 except subprocess.CalledProcessError as e:
370 cls.logger.critical("Couldn't start vpp: %s" % e)
376 def wait_for_stats_socket(cls):
377 deadline = time.time() + 3
379 while time.time() < deadline or \
380 cls.debug_gdb or cls.debug_gdbserver:
381 if os.path.exists(cls.stats_sock):
386 cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
391 Perform class setup before running the testcase
392 Remove shared memory files, start vpp and connect the vpp-api
394 super(VppTestCase, cls).setUpClass()
395 gc.collect() # run garbage collection first
397 cls.logger = get_logger(cls.__name__)
398 if hasattr(cls, 'parallel_handler'):
399 cls.logger.addHandler(cls.parallel_handler)
400 cls.logger.propagate = False
401 cls.tempdir = tempfile.mkdtemp(
402 prefix='vpp-unittest-%s-' % cls.__name__)
403 cls.stats_sock = "%s/stats.sock" % cls.tempdir
404 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
405 cls.file_handler.setFormatter(
406 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
408 cls.file_handler.setLevel(DEBUG)
409 cls.logger.addHandler(cls.file_handler)
410 cls.shm_prefix = os.path.basename(cls.tempdir)
411 os.chdir(cls.tempdir)
412 cls.logger.info("Temporary dir is %s, shm prefix is %s",
413 cls.tempdir, cls.shm_prefix)
415 cls.reset_packet_infos()
417 cls._zombie_captures = []
420 cls.registry = VppObjectRegistry()
421 cls.vpp_startup_failed = False
422 cls.reporter = KeepAliveReporter()
423 # need to catch exceptions here because if we raise, then the cleanup
424 # doesn't get called and we might end with a zombie vpp
427 cls.reporter.send_keep_alive(cls, 'setUpClass')
428 VppTestResult.current_test_case_info = TestCaseInfo(
429 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
430 cls.vpp_stdout_deque = deque()
431 cls.vpp_stderr_deque = deque()
432 cls.pump_thread_stop_flag = Event()
433 cls.pump_thread_wakeup_pipe = os.pipe()
434 cls.pump_thread = Thread(target=pump_output, args=(cls,))
435 cls.pump_thread.daemon = True
436 cls.pump_thread.start()
437 if cls.debug_gdb or cls.debug_gdbserver:
441 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
447 cls.vapi.register_hook(hook)
448 cls.wait_for_stats_socket()
449 cls.statistics = VPPStats(socketname=cls.stats_sock)
453 cls.vpp_startup_failed = True
455 "VPP died shortly after startup, check the"
456 " output to standard error for possible cause")
462 cls.vapi.disconnect()
465 if cls.debug_gdbserver:
466 print(colorize("You're running VPP inside gdbserver but "
467 "VPP-API connection failed, did you forget "
468 "to 'continue' VPP from within gdb?", RED))
480 Disconnect vpp-api, kill vpp and cleanup shared memory files
482 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
484 if cls.vpp.returncode is None:
485 print(double_line_delim)
486 print("VPP or GDB server is still running")
487 print(single_line_delim)
488 input("When done debugging, press ENTER to kill the "
489 "process and finish running the testcase...")
491 # first signal that we want to stop the pump thread, then wake it up
492 if hasattr(cls, 'pump_thread_stop_flag'):
493 cls.pump_thread_stop_flag.set()
494 if hasattr(cls, 'pump_thread_wakeup_pipe'):
495 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
496 if hasattr(cls, 'pump_thread'):
497 cls.logger.debug("Waiting for pump thread to stop")
498 cls.pump_thread.join()
499 if hasattr(cls, 'vpp_stderr_reader_thread'):
500 cls.logger.debug("Waiting for stdderr pump to stop")
501 cls.vpp_stderr_reader_thread.join()
503 if hasattr(cls, 'vpp'):
504 if hasattr(cls, 'vapi'):
505 cls.vapi.disconnect()
508 if cls.vpp.returncode is None:
509 cls.logger.debug("Sending TERM to vpp")
511 cls.logger.debug("Waiting for vpp to die")
512 cls.vpp.communicate()
515 if cls.vpp_startup_failed:
516 stdout_log = cls.logger.info
517 stderr_log = cls.logger.critical
519 stdout_log = cls.logger.info
520 stderr_log = cls.logger.info
522 if hasattr(cls, 'vpp_stdout_deque'):
523 stdout_log(single_line_delim)
524 stdout_log('VPP output to stdout while running %s:', cls.__name__)
525 stdout_log(single_line_delim)
526 vpp_output = "".join(cls.vpp_stdout_deque)
527 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
529 stdout_log('\n%s', vpp_output)
530 stdout_log(single_line_delim)
532 if hasattr(cls, 'vpp_stderr_deque'):
533 stderr_log(single_line_delim)
534 stderr_log('VPP output to stderr while running %s:', cls.__name__)
535 stderr_log(single_line_delim)
536 vpp_output = "".join(cls.vpp_stderr_deque)
537 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
539 stderr_log('\n%s', vpp_output)
540 stderr_log(single_line_delim)
543 def tearDownClass(cls):
544 """ Perform final cleanup after running all tests in this test-case """
545 cls.reporter.send_keep_alive(cls, 'tearDownClass')
547 cls.file_handler.close()
548 cls.reset_packet_infos()
550 debug_internal.on_tear_down_class(cls)
553 """ Show various debug prints after each test """
554 super(VppTestCase, self).tearDown()
555 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
556 (self.__class__.__name__, self._testMethodName,
557 self._testMethodDoc))
558 if not self.vpp_dead:
559 self.logger.debug(self.vapi.cli("show trace max 1000"))
560 self.logger.info(self.vapi.ppcli("show interface"))
561 self.logger.info(self.vapi.ppcli("show hardware"))
562 self.logger.info(self.statistics.set_errors_str())
563 self.logger.info(self.vapi.ppcli("show run"))
564 self.logger.info(self.vapi.ppcli("show log"))
565 self.registry.remove_vpp_config(self.logger)
566 # Save/Dump VPP api trace log
567 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
568 tmp_api_trace = "/tmp/%s" % api_trace
569 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
570 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
571 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
573 os.rename(tmp_api_trace, vpp_api_trace_log)
574 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
577 self.registry.unregister_all(self.logger)
580 """ Clear trace before running each test"""
581 super(VppTestCase, self).setUp()
582 self.reporter.send_keep_alive(self)
583 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
584 (self.__class__.__name__, self._testMethodName,
585 self._testMethodDoc))
587 raise Exception("VPP is dead when setting up the test")
588 self.sleep(.1, "during setUp")
589 self.vpp_stdout_deque.append(
590 "--- test setUp() for %s.%s(%s) starts here ---\n" %
591 (self.__class__.__name__, self._testMethodName,
592 self._testMethodDoc))
593 self.vpp_stderr_deque.append(
594 "--- test setUp() for %s.%s(%s) starts here ---\n" %
595 (self.__class__.__name__, self._testMethodName,
596 self._testMethodDoc))
597 self.vapi.cli("clear trace")
598 # store the test instance inside the test class - so that objects
599 # holding the class can access instance methods (like assertEqual)
600 type(self).test_instance = self
603 def pg_enable_capture(cls, interfaces=None):
605 Enable capture on packet-generator interfaces
607 :param interfaces: iterable interface indexes (if None,
608 use self.pg_interfaces)
611 if interfaces is None:
612 interfaces = cls.pg_interfaces
617 def register_capture(cls, cap_name):
618 """ Register a capture in the testclass """
619 # add to the list of captures with current timestamp
620 cls._captures.append((time.time(), cap_name))
621 # filter out from zombies
622 cls._zombie_captures = [(stamp, name)
623 for (stamp, name) in cls._zombie_captures
628 """ Remove any zombie captures and enable the packet generator """
629 # how long before capture is allowed to be deleted - otherwise vpp
630 # crashes - 100ms seems enough (this shouldn't be needed at all)
633 for stamp, cap_name in cls._zombie_captures:
634 wait = stamp + capture_ttl - now
636 cls.sleep(wait, "before deleting capture %s" % cap_name)
638 cls.logger.debug("Removing zombie capture %s" % cap_name)
639 cls.vapi.cli('packet-generator delete %s' % cap_name)
641 cls.vapi.cli("trace add pg-input 1000")
642 cls.vapi.cli('packet-generator enable')
643 cls._zombie_captures = cls._captures
647 def create_pg_interfaces(cls, interfaces):
649 Create packet-generator interfaces.
651 :param interfaces: iterable indexes of the interfaces.
652 :returns: List of created interfaces.
657 intf = VppPGInterface(cls, i)
658 setattr(cls, intf.name, intf)
660 cls.pg_interfaces = result
664 def create_loopback_interfaces(cls, count):
666 Create loopback interfaces.
668 :param count: number of interfaces created.
669 :returns: List of created interfaces.
671 result = [VppLoInterface(cls) for i in range(count)]
673 setattr(cls, intf.name, intf)
674 cls.lo_interfaces = result
678 def extend_packet(packet, size, padding=' '):
680 Extend packet to given size by padding with spaces or custom padding
681 NOTE: Currently works only when Raw layer is present.
683 :param packet: packet
684 :param size: target size
685 :param padding: padding used to extend the payload
688 packet_len = len(packet) + 4
689 extend = size - packet_len
691 num = (extend / len(padding)) + 1
692 packet[Raw].load += (padding * num)[:extend]
695 def reset_packet_infos(cls):
696 """ Reset the list of packet info objects and packet counts to zero """
697 cls._packet_infos = {}
698 cls._packet_count_for_dst_if_idx = {}
701 def create_packet_info(cls, src_if, dst_if):
703 Create packet info object containing the source and destination indexes
704 and add it to the testcase's packet info list
706 :param VppInterface src_if: source interface
707 :param VppInterface dst_if: destination interface
709 :returns: _PacketInfo object
713 info.index = len(cls._packet_infos)
714 info.src = src_if.sw_if_index
715 info.dst = dst_if.sw_if_index
716 if isinstance(dst_if, VppSubInterface):
717 dst_idx = dst_if.parent.sw_if_index
719 dst_idx = dst_if.sw_if_index
720 if dst_idx in cls._packet_count_for_dst_if_idx:
721 cls._packet_count_for_dst_if_idx[dst_idx] += 1
723 cls._packet_count_for_dst_if_idx[dst_idx] = 1
724 cls._packet_infos[info.index] = info
728 def info_to_payload(info):
730 Convert _PacketInfo object to packet payload
732 :param info: _PacketInfo object
734 :returns: string containing serialized data from packet info
736 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
740 def payload_to_info(payload):
742 Convert packet payload to _PacketInfo object
744 :param payload: packet payload
746 :returns: _PacketInfo object containing de-serialized data from payload
749 numbers = payload.split()
751 info.index = int(numbers[0])
752 info.src = int(numbers[1])
753 info.dst = int(numbers[2])
754 info.ip = int(numbers[3])
755 info.proto = int(numbers[4])
758 def get_next_packet_info(self, info):
760 Iterate over the packet info list stored in the testcase
761 Start iteration with first element if info is None
762 Continue based on index in info if info is specified
764 :param info: info or None
765 :returns: next info in list or None if no more infos
770 next_index = info.index + 1
771 if next_index == len(self._packet_infos):
774 return self._packet_infos[next_index]
776 def get_next_packet_info_for_interface(self, src_index, info):
778 Search the packet info list for the next packet info with same source
781 :param src_index: source interface index to search for
782 :param info: packet info - where to start the search
783 :returns: packet info or None
787 info = self.get_next_packet_info(info)
790 if info.src == src_index:
793 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
795 Search the packet info list for the next packet info with same source
796 and destination interface indexes
798 :param src_index: source interface index to search for
799 :param dst_index: destination interface index to search for
800 :param info: packet info - where to start the search
801 :returns: packet info or None
805 info = self.get_next_packet_info_for_interface(src_index, info)
808 if info.dst == dst_index:
811 def assert_equal(self, real_value, expected_value, name_or_class=None):
812 if name_or_class is None:
813 self.assertEqual(real_value, expected_value)
816 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
817 msg = msg % (getdoc(name_or_class).strip(),
818 real_value, str(name_or_class(real_value)),
819 expected_value, str(name_or_class(expected_value)))
821 msg = "Invalid %s: %s does not match expected value %s" % (
822 name_or_class, real_value, expected_value)
824 self.assertEqual(real_value, expected_value, msg)
826 def assert_in_range(self,
834 msg = "Invalid %s: %s out of range <%s,%s>" % (
835 name, real_value, expected_min, expected_max)
836 self.assertTrue(expected_min <= real_value <= expected_max, msg)
838 def assert_packet_checksums_valid(self, packet,
839 ignore_zero_udp_checksums=True):
840 received = packet.__class__(str(packet))
842 ppp("Verifying packet checksums for packet:", received))
843 udp_layers = ['UDP', 'UDPerror']
844 checksum_fields = ['cksum', 'chksum']
847 temp = received.__class__(str(received))
849 layer = temp.getlayer(counter)
851 for cf in checksum_fields:
852 if hasattr(layer, cf):
853 if ignore_zero_udp_checksums and \
854 0 == getattr(layer, cf) and \
855 layer.name in udp_layers:
858 checksums.append((counter, cf))
861 counter = counter + 1
862 if 0 == len(checksums):
864 temp = temp.__class__(str(temp))
865 for layer, cf in checksums:
866 calc_sum = getattr(temp[layer], cf)
868 getattr(received[layer], cf), calc_sum,
869 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
871 "Checksum field `%s` on `%s` layer has correct value `%s`" %
872 (cf, temp[layer].name, calc_sum))
874 def assert_checksum_valid(self, received_packet, layer,
876 ignore_zero_checksum=False):
877 """ Check checksum of received packet on given layer """
878 received_packet_checksum = getattr(received_packet[layer], field_name)
879 if ignore_zero_checksum and 0 == received_packet_checksum:
881 recalculated = received_packet.__class__(str(received_packet))
882 delattr(recalculated[layer], field_name)
883 recalculated = recalculated.__class__(str(recalculated))
884 self.assert_equal(received_packet_checksum,
885 getattr(recalculated[layer], field_name),
886 "packet checksum on layer: %s" % layer)
888 def assert_ip_checksum_valid(self, received_packet,
889 ignore_zero_checksum=False):
890 self.assert_checksum_valid(received_packet, 'IP',
891 ignore_zero_checksum=ignore_zero_checksum)
893 def assert_tcp_checksum_valid(self, received_packet,
894 ignore_zero_checksum=False):
895 self.assert_checksum_valid(received_packet, 'TCP',
896 ignore_zero_checksum=ignore_zero_checksum)
898 def assert_udp_checksum_valid(self, received_packet,
899 ignore_zero_checksum=True):
900 self.assert_checksum_valid(received_packet, 'UDP',
901 ignore_zero_checksum=ignore_zero_checksum)
903 def assert_embedded_icmp_checksum_valid(self, received_packet):
904 if received_packet.haslayer(IPerror):
905 self.assert_checksum_valid(received_packet, 'IPerror')
906 if received_packet.haslayer(TCPerror):
907 self.assert_checksum_valid(received_packet, 'TCPerror')
908 if received_packet.haslayer(UDPerror):
909 self.assert_checksum_valid(received_packet, 'UDPerror',
910 ignore_zero_checksum=True)
911 if received_packet.haslayer(ICMPerror):
912 self.assert_checksum_valid(received_packet, 'ICMPerror')
914 def assert_icmp_checksum_valid(self, received_packet):
915 self.assert_checksum_valid(received_packet, 'ICMP')
916 self.assert_embedded_icmp_checksum_valid(received_packet)
918 def assert_icmpv6_checksum_valid(self, pkt):
919 if pkt.haslayer(ICMPv6DestUnreach):
920 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
921 self.assert_embedded_icmp_checksum_valid(pkt)
922 if pkt.haslayer(ICMPv6EchoRequest):
923 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
924 if pkt.haslayer(ICMPv6EchoReply):
925 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
927 def assert_packet_counter_equal(self, counter, expected_value):
928 if counter.startswith("/"):
929 counter_value = self.statistics.get_counter(counter)
930 self.assert_equal(counter_value, expected_value,
931 "packet counter `%s'" % counter)
933 counters = self.vapi.cli("sh errors").split('\n')
935 for i in range(1, len(counters) - 1):
936 results = counters[i].split()
937 if results[1] == counter:
938 counter_value = int(results[0])
942 def sleep(cls, timeout, remark=None):
943 if hasattr(cls, 'logger'):
944 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
948 if hasattr(cls, 'logger') and after - before > 2 * timeout:
949 cls.logger.error("unexpected time.sleep() result - "
950 "slept for %es instead of ~%es!",
951 after - before, timeout)
952 if hasattr(cls, 'logger'):
954 "Finished sleep (%s) - slept %es (wanted %es)",
955 remark, after - before, timeout)
957 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
958 self.vapi.cli("clear trace")
959 intf.add_stream(pkts)
960 self.pg_enable_capture(self.pg_interfaces)
964 for i in self.pg_interfaces:
965 i.get_capture(0, timeout=timeout)
966 i.assert_nothing_captured(remark=remark)
969 def send_and_expect(self, input, pkts, output):
970 self.vapi.cli("clear trace")
971 input.add_stream(pkts)
972 self.pg_enable_capture(self.pg_interfaces)
974 rx = output.get_capture(len(pkts))
977 def send_and_expect_only(self, input, pkts, output, timeout=None):
978 self.vapi.cli("clear trace")
979 input.add_stream(pkts)
980 self.pg_enable_capture(self.pg_interfaces)
982 rx = output.get_capture(len(pkts))
986 for i in self.pg_interfaces:
988 i.get_capture(0, timeout=timeout)
989 i.assert_nothing_captured()
995 """ unittest calls runTest when TestCase is instantiated without a
996 test case. Use case: Writing unittests against VppTestCase"""
1000 def get_testcase_doc_name(test):
1001 return getdoc(test.__class__).splitlines()[0]
1004 def get_test_description(descriptions, test):
1005 short_description = test.shortDescription()
1006 if descriptions and short_description:
1007 return short_description
1012 class TestCaseInfo(object):
1013 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1014 self.logger = logger
1015 self.tempdir = tempdir
1016 self.vpp_pid = vpp_pid
1017 self.vpp_bin_path = vpp_bin_path
1018 self.core_crash_test = None
1021 class VppTestResult(unittest.TestResult):
1023 @property result_string
1024 String variable to store the test case result string.
1026 List variable containing 2-tuples of TestCase instances and strings
1027 holding formatted tracebacks. Each tuple represents a test which
1028 raised an unexpected exception.
1030 List variable containing 2-tuples of TestCase instances and strings
1031 holding formatted tracebacks. Each tuple represents a test where
1032 a failure was explicitly signalled using the TestCase.assert*()
1036 failed_test_cases_info = set()
1037 core_crash_test_cases_info = set()
1038 current_test_case_info = None
1040 def __init__(self, stream=None, descriptions=None, verbosity=None,
1043 :param stream File descriptor to store where to report test results.
1044 Set to the standard error stream by default.
1045 :param descriptions Boolean variable to store information if to use
1046 test case descriptions.
1047 :param verbosity Integer variable to store required verbosity level.
1049 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1050 self.stream = stream
1051 self.descriptions = descriptions
1052 self.verbosity = verbosity
1053 self.result_string = None
1054 self.runner = runner
1056 def addSuccess(self, test):
1058 Record a test succeeded result
1063 if self.current_test_case_info:
1064 self.current_test_case_info.logger.debug(
1065 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1066 test._testMethodName,
1067 test._testMethodDoc))
1068 unittest.TestResult.addSuccess(self, test)
1069 self.result_string = colorize("OK", GREEN)
1071 self.send_result_through_pipe(test, PASS)
1073 def addSkip(self, test, reason):
1075 Record a test skipped.
1081 if self.current_test_case_info:
1082 self.current_test_case_info.logger.debug(
1083 "--- addSkip() %s.%s(%s) called, reason is %s" %
1084 (test.__class__.__name__, test._testMethodName,
1085 test._testMethodDoc, reason))
1086 unittest.TestResult.addSkip(self, test, reason)
1087 self.result_string = colorize("SKIP", YELLOW)
1089 self.send_result_through_pipe(test, SKIP)
1091 def symlink_failed(self):
1092 if self.current_test_case_info:
1094 failed_dir = os.getenv('FAILED_DIR')
1095 link_path = os.path.join(
1098 os.path.basename(self.current_test_case_info.tempdir))
1099 if self.current_test_case_info.logger:
1100 self.current_test_case_info.logger.debug(
1101 "creating a link to the failed test")
1102 self.current_test_case_info.logger.debug(
1103 "os.symlink(%s, %s)" %
1104 (self.current_test_case_info.tempdir, link_path))
1105 if os.path.exists(link_path):
1106 if self.current_test_case_info.logger:
1107 self.current_test_case_info.logger.debug(
1108 'symlink already exists')
1110 os.symlink(self.current_test_case_info.tempdir, link_path)
1112 except Exception as e:
1113 if self.current_test_case_info.logger:
1114 self.current_test_case_info.logger.error(e)
1116 def send_result_through_pipe(self, test, result):
1117 if hasattr(self, 'test_framework_result_pipe'):
1118 pipe = self.test_framework_result_pipe
1120 pipe.send((test.id(), result))
1122 def log_error(self, test, err, fn_name):
1123 if self.current_test_case_info:
1124 if isinstance(test, unittest.suite._ErrorHolder):
1125 test_name = test.description
1127 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1128 test._testMethodName,
1129 test._testMethodDoc)
1130 self.current_test_case_info.logger.debug(
1131 "--- %s() %s called, err is %s" %
1132 (fn_name, test_name, err))
1133 self.current_test_case_info.logger.debug(
1134 "formatted exception is:\n%s" %
1135 "".join(format_exception(*err)))
1137 def add_error(self, test, err, unittest_fn, error_type):
1138 if error_type == FAIL:
1139 self.log_error(test, err, 'addFailure')
1140 error_type_str = colorize("FAIL", RED)
1141 elif error_type == ERROR:
1142 self.log_error(test, err, 'addError')
1143 error_type_str = colorize("ERROR", RED)
1145 raise Exception('Error type %s cannot be used to record an '
1146 'error or a failure' % error_type)
1148 unittest_fn(self, test, err)
1149 if self.current_test_case_info:
1150 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1152 self.current_test_case_info.tempdir)
1153 self.symlink_failed()
1154 self.failed_test_cases_info.add(self.current_test_case_info)
1155 if is_core_present(self.current_test_case_info.tempdir):
1156 if not self.current_test_case_info.core_crash_test:
1157 if isinstance(test, unittest.suite._ErrorHolder):
1158 test_name = str(test)
1160 test_name = "'{}' ({})".format(
1161 get_testcase_doc_name(test), test.id())
1162 self.current_test_case_info.core_crash_test = test_name
1163 self.core_crash_test_cases_info.add(
1164 self.current_test_case_info)
1166 self.result_string = '%s [no temp dir]' % error_type_str
1168 self.send_result_through_pipe(test, error_type)
1170 def addFailure(self, test, err):
1172 Record a test failed result
1175 :param err: error message
1178 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1180 def addError(self, test, err):
1182 Record a test error result
1185 :param err: error message
1188 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1190 def getDescription(self, test):
1192 Get test description
1195 :returns: test description
1198 return get_test_description(self.descriptions, test)
1200 def startTest(self, test):
1208 def print_header(test):
1209 if not hasattr(test.__class__, '_header_printed'):
1210 print(double_line_delim)
1211 print(colorize(getdoc(test).splitlines()[0], GREEN))
1212 print(double_line_delim)
1213 test.__class__._header_printed = True
1217 unittest.TestResult.startTest(self, test)
1218 if self.verbosity > 0:
1219 self.stream.writeln(
1220 "Starting " + self.getDescription(test) + " ...")
1221 self.stream.writeln(single_line_delim)
1223 def stopTest(self, test):
1225 Called when the given test has been run
1230 unittest.TestResult.stopTest(self, test)
1231 if self.verbosity > 0:
1232 self.stream.writeln(single_line_delim)
1233 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1234 self.result_string))
1235 self.stream.writeln(single_line_delim)
1237 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1238 self.result_string))
1240 self.send_result_through_pipe(test, TEST_RUN)
1242 def printErrors(self):
1244 Print errors from running the test case
1246 if len(self.errors) > 0 or len(self.failures) > 0:
1247 self.stream.writeln()
1248 self.printErrorList('ERROR', self.errors)
1249 self.printErrorList('FAIL', self.failures)
1251 # ^^ that is the last output from unittest before summary
1252 if not self.runner.print_summary:
1253 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1254 self.stream = devnull
1255 self.runner.stream = devnull
1257 def printErrorList(self, flavour, errors):
1259 Print error list to the output stream together with error type
1260 and test case description.
1262 :param flavour: error type
1263 :param errors: iterable errors
1266 for test, err in errors:
1267 self.stream.writeln(double_line_delim)
1268 self.stream.writeln("%s: %s" %
1269 (flavour, self.getDescription(test)))
1270 self.stream.writeln(single_line_delim)
1271 self.stream.writeln("%s" % err)
1274 class VppTestRunner(unittest.TextTestRunner):
1276 A basic test runner implementation which prints results to standard error.
1280 def resultclass(self):
1281 """Class maintaining the results of the tests"""
1282 return VppTestResult
1284 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1285 result_pipe=None, failfast=False, buffer=False,
1286 resultclass=None, print_summary=True, **kwargs):
1287 # ignore stream setting here, use hard-coded stdout to be in sync
1288 # with prints from VppTestCase methods ...
1289 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1290 verbosity, failfast, buffer,
1291 resultclass, **kwargs)
1292 KeepAliveReporter.pipe = keep_alive_pipe
1294 self.orig_stream = self.stream
1295 self.resultclass.test_framework_result_pipe = result_pipe
1297 self.print_summary = print_summary
1299 def _makeResult(self):
1300 return self.resultclass(self.stream,
1305 def run(self, test):
1312 faulthandler.enable() # emit stack trace to stderr if killed by signal
1314 result = super(VppTestRunner, self).run(test)
1315 if not self.print_summary:
1316 self.stream = self.orig_stream
1317 result.stream = self.orig_stream
1321 class Worker(Thread):
1322 def __init__(self, args, logger, env={}):
1323 self.logger = logger
1326 self.env = copy.deepcopy(env)
1327 super(Worker, self).__init__()
1330 executable = self.args[0]
1331 self.logger.debug("Running executable w/args `%s'" % self.args)
1332 env = os.environ.copy()
1333 env.update(self.env)
1334 env["CK_LOG_FILE_NAME"] = "-"
1335 self.process = subprocess.Popen(
1336 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1337 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1338 out, err = self.process.communicate()
1339 self.logger.debug("Finished running `%s'" % executable)
1340 self.logger.info("Return code is `%s'" % self.process.returncode)
1341 self.logger.info(single_line_delim)
1342 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1343 self.logger.info(single_line_delim)
1344 self.logger.info(out)
1345 self.logger.info(single_line_delim)
1346 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1347 self.logger.info(single_line_delim)
1348 self.logger.info(err)
1349 self.logger.info(single_line_delim)
1350 self.result = self.process.returncode
1352 if __name__ == '__main__':