3 from __future__ import print_function
16 from collections import deque
17 from threading import Thread, Event
18 from inspect import getdoc, isclass
19 from traceback import format_exception
20 from logging import FileHandler, DEBUG, Formatter
21 from scapy.packet import Raw
22 from hook import StepHook, PollHook, VppDiedError
23 from vpp_pg_interface import VppPGInterface
24 from vpp_sub_interface import VppSubInterface
25 from vpp_lo_interface import VppLoInterface
26 from vpp_papi_provider import VppPapiProvider
27 from vpp_papi.vpp_stats import VPPStats
28 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
30 from vpp_object import VppObjectRegistry
31 from util import ppp, is_core_present
32 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
33 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
34 from scapy.layers.inet6 import ICMPv6EchoReply
36 if os.name == 'posix' and sys.version_info[0] < 3:
37 # using subprocess32 is recommended by python official documentation
38 # @ https://docs.python.org/2/library/subprocess.html
39 import subprocess32 as subprocess
43 # Python2/3 compatible
55 debug_framework = False
56 if os.getenv('TEST_DEBUG', "0") == "1":
57 debug_framework = True
61 Test framework module.
63 The module provides a set of tools for constructing and running tests and
64 representing the results.
68 class _PacketInfo(object):
69 """Private class to create packet info object.
71 Help process information about the next packet.
72 Set variables to default values.
74 #: Store the index of the packet.
76 #: Store the index of the source packet generator interface of the packet.
78 #: Store the index of the destination packet generator interface
81 #: Store expected ip version
83 #: Store expected upper protocol
85 #: Store the copy of the former packet.
88 def __eq__(self, other):
89 index = self.index == other.index
90 src = self.src == other.src
91 dst = self.dst == other.dst
92 data = self.data == other.data
93 return index and src and dst and data
96 def pump_output(testclass):
97 """ pump output from vpp stdout/stderr to proper queues """
100 while not testclass.pump_thread_stop_flag.is_set():
101 readable = select.select([testclass.vpp.stdout.fileno(),
102 testclass.vpp.stderr.fileno(),
103 testclass.pump_thread_wakeup_pipe[0]],
105 if testclass.vpp.stdout.fileno() in readable:
106 read = os.read(testclass.vpp.stdout.fileno(), 102400)
108 split = read.splitlines(True)
109 if len(stdout_fragment) > 0:
110 split[0] = "%s%s" % (stdout_fragment, split[0])
111 if len(split) > 0 and split[-1].endswith("\n"):
115 stdout_fragment = split[-1]
116 testclass.vpp_stdout_deque.extend(split[:limit])
117 if not testclass.cache_vpp_output:
118 for line in split[:limit]:
119 testclass.logger.debug(
120 "VPP STDOUT: %s" % line.rstrip("\n"))
121 if testclass.vpp.stderr.fileno() in readable:
122 read = os.read(testclass.vpp.stderr.fileno(), 102400)
124 split = read.splitlines(True)
125 if len(stderr_fragment) > 0:
126 split[0] = "%s%s" % (stderr_fragment, split[0])
127 if len(split) > 0 and split[-1].endswith(b"\n"):
131 stderr_fragment = split[-1]
132 testclass.vpp_stderr_deque.extend(split[:limit])
133 if not testclass.cache_vpp_output:
134 for line in split[:limit]:
135 testclass.logger.debug(
136 "VPP STDERR: %s" % line.rstrip("\n"))
137 # ignoring the dummy pipe here intentionally - the
138 # flag will take care of properly terminating the loop
141 def _is_skip_aarch64_set():
142 return os.getenv('SKIP_AARCH64', 'n').lower() in ('yes', 'y', '1')
144 is_skip_aarch64_set = _is_skip_aarch64_set()
147 def _is_platform_aarch64():
148 return platform.machine() == 'aarch64'
150 is_platform_aarch64 = _is_platform_aarch64()
153 def _running_extended_tests():
154 s = os.getenv("EXTENDED_TESTS", "n")
155 return True if s.lower() in ("y", "yes", "1") else False
157 running_extended_tests = _running_extended_tests()
160 def _running_on_centos():
161 os_id = os.getenv("OS_ID", "")
162 return True if "centos" in os_id.lower() else False
164 running_on_centos = _running_on_centos
167 class KeepAliveReporter(object):
169 Singleton object which reports test start to parent process
174 self.__dict__ = self._shared_state
182 def pipe(self, pipe):
183 if self._pipe is not None:
184 raise Exception("Internal error - pipe should only be set once.")
187 def send_keep_alive(self, test, desc=None):
189 Write current test tmpdir & desc to keep-alive pipe to signal liveness
191 if self.pipe is None:
192 # if not running forked..
196 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
200 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
203 class VppTestCase(unittest.TestCase):
204 """This subclass is a base class for VPP test cases that are implemented as
205 classes. It provides methods to create and run test case.
208 extra_vpp_punt_config = []
209 extra_vpp_plugin_config = []
212 def packet_infos(self):
213 """List of packet infos"""
214 return self._packet_infos
217 def get_packet_count_for_if_idx(cls, dst_if_index):
218 """Get the number of packet info for specified destination if index"""
219 if dst_if_index in cls._packet_count_for_dst_if_idx:
220 return cls._packet_count_for_dst_if_idx[dst_if_index]
226 """Return the instance of this testcase"""
227 return cls.test_instance
230 def set_debug_flags(cls, d):
231 cls.debug_core = False
232 cls.debug_gdb = False
233 cls.debug_gdbserver = False
238 cls.debug_core = True
241 elif dl == "gdbserver":
242 cls.debug_gdbserver = True
244 raise Exception("Unrecognized DEBUG option: '%s'" % d)
247 def get_least_used_cpu():
248 cpu_usage_list = [set(range(psutil.cpu_count()))]
249 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
250 if 'vpp_main' == p.info['name']]
251 for vpp_process in vpp_processes:
252 for cpu_usage_set in cpu_usage_list:
254 cpu_num = vpp_process.cpu_num()
255 if cpu_num in cpu_usage_set:
256 cpu_usage_set_index = cpu_usage_list.index(
258 if cpu_usage_set_index == len(cpu_usage_list) - 1:
259 cpu_usage_list.append({cpu_num})
261 cpu_usage_list[cpu_usage_set_index + 1].add(
263 cpu_usage_set.remove(cpu_num)
265 except psutil.NoSuchProcess:
268 for cpu_usage_set in cpu_usage_list:
269 if len(cpu_usage_set) > 0:
270 min_usage_set = cpu_usage_set
273 return random.choice(tuple(min_usage_set))
276 def print_header(cls):
277 if not hasattr(cls, '_header_printed'):
278 print(double_line_delim)
279 print(colorize(getdoc(cls).splitlines()[0], GREEN))
280 print(double_line_delim)
281 cls._header_printed = True
284 def setUpConstants(cls):
285 """ Set-up the test case class based on environment variables """
286 s = os.getenv("STEP", "n")
287 cls.step = True if s.lower() in ("y", "yes", "1") else False
288 d = os.getenv("DEBUG", None)
289 c = os.getenv("CACHE_OUTPUT", "1")
290 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
291 cls.set_debug_flags(d)
292 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
293 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
294 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
296 if cls.plugin_path is not None:
297 if cls.extern_plugin_path is not None:
298 plugin_path = "%s:%s" % (
299 cls.plugin_path, cls.extern_plugin_path)
301 plugin_path = cls.plugin_path
302 elif cls.extern_plugin_path is not None:
303 plugin_path = cls.extern_plugin_path
305 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
306 debug_cli = "cli-listen localhost:5002"
308 size = os.getenv("COREDUMP_SIZE")
310 coredump_size = "coredump-size %s" % size
311 if coredump_size is None:
312 coredump_size = "coredump-size unlimited"
314 cpu_core_number = cls.get_least_used_cpu()
316 cls.vpp_cmdline = [cls.vpp_bin, "unix",
317 "{", "nodaemon", debug_cli, "full-coredump",
318 coredump_size, "runtime-dir", cls.tempdir, "}",
319 "api-trace", "{", "on", "}", "api-segment", "{",
320 "prefix", cls.shm_prefix, "}", "cpu", "{",
321 "main-core", str(cpu_core_number), "}", "statseg",
322 "{", "socket-name", cls.stats_sock, "}", "plugins",
323 "{", "plugin", "dpdk_plugin.so", "{", "disable",
324 "}", "plugin", "unittest_plugin.so", "{", "enable",
325 "}"] + cls.extra_vpp_plugin_config + ["}", ]
326 if cls.extra_vpp_punt_config is not None:
327 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
328 if plugin_path is not None:
329 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
330 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
331 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
334 def wait_for_enter(cls):
335 if cls.debug_gdbserver:
336 print(double_line_delim)
337 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
339 print(double_line_delim)
340 print("Spawned VPP with PID: %d" % cls.vpp.pid)
342 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
344 print(single_line_delim)
345 print("You can debug the VPP using e.g.:")
346 if cls.debug_gdbserver:
347 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
348 print("Now is the time to attach a gdb by running the above "
349 "command, set up breakpoints etc. and then resume VPP from "
350 "within gdb by issuing the 'continue' command")
352 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
353 print("Now is the time to attach a gdb by running the above "
354 "command and set up breakpoints etc.")
355 print(single_line_delim)
356 input("Press ENTER to continue running the testcase...")
360 cmdline = cls.vpp_cmdline
362 if cls.debug_gdbserver:
363 gdbserver = '/usr/bin/gdbserver'
364 if not os.path.isfile(gdbserver) or \
365 not os.access(gdbserver, os.X_OK):
366 raise Exception("gdbserver binary '%s' does not exist or is "
367 "not executable" % gdbserver)
369 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
370 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
373 cls.vpp = subprocess.Popen(cmdline,
374 stdout=subprocess.PIPE,
375 stderr=subprocess.PIPE,
377 except subprocess.CalledProcessError as e:
378 cls.logger.critical("Couldn't start vpp: %s" % e)
384 def wait_for_stats_socket(cls):
385 deadline = time.time() + 3
387 while time.time() < deadline or \
388 cls.debug_gdb or cls.debug_gdbserver:
389 if os.path.exists(cls.stats_sock):
394 cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
399 Perform class setup before running the testcase
400 Remove shared memory files, start vpp and connect the vpp-api
402 gc.collect() # run garbage collection first
405 cls.logger = get_logger(cls.__name__)
406 if hasattr(cls, 'parallel_handler'):
407 cls.logger.addHandler(cls.parallel_handler)
408 cls.logger.propagate = False
409 cls.tempdir = tempfile.mkdtemp(
410 prefix='vpp-unittest-%s-' % cls.__name__)
411 cls.stats_sock = "%s/stats.sock" % cls.tempdir
412 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
413 cls.file_handler.setFormatter(
414 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
416 cls.file_handler.setLevel(DEBUG)
417 cls.logger.addHandler(cls.file_handler)
418 cls.shm_prefix = os.path.basename(cls.tempdir)
419 os.chdir(cls.tempdir)
420 cls.logger.info("Temporary dir is %s, shm prefix is %s",
421 cls.tempdir, cls.shm_prefix)
423 cls.reset_packet_infos()
425 cls._zombie_captures = []
428 cls.registry = VppObjectRegistry()
429 cls.vpp_startup_failed = False
430 cls.reporter = KeepAliveReporter()
431 # need to catch exceptions here because if we raise, then the cleanup
432 # doesn't get called and we might end with a zombie vpp
435 cls.reporter.send_keep_alive(cls, 'setUpClass')
436 VppTestResult.current_test_case_info = TestCaseInfo(
437 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
438 cls.vpp_stdout_deque = deque()
439 cls.vpp_stderr_deque = deque()
440 cls.pump_thread_stop_flag = Event()
441 cls.pump_thread_wakeup_pipe = os.pipe()
442 cls.pump_thread = Thread(target=pump_output, args=(cls,))
443 cls.pump_thread.daemon = True
444 cls.pump_thread.start()
445 if cls.debug_gdb or cls.debug_gdbserver:
449 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
455 cls.vapi.register_hook(hook)
456 cls.wait_for_stats_socket()
457 cls.statistics = VPPStats(socketname=cls.stats_sock)
461 cls.vpp_startup_failed = True
463 "VPP died shortly after startup, check the"
464 " output to standard error for possible cause")
470 cls.vapi.disconnect()
473 if cls.debug_gdbserver:
474 print(colorize("You're running VPP inside gdbserver but "
475 "VPP-API connection failed, did you forget "
476 "to 'continue' VPP from within gdb?", RED))
488 Disconnect vpp-api, kill vpp and cleanup shared memory files
490 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
492 if cls.vpp.returncode is None:
493 print(double_line_delim)
494 print("VPP or GDB server is still running")
495 print(single_line_delim)
496 input("When done debugging, press ENTER to kill the "
497 "process and finish running the testcase...")
499 # first signal that we want to stop the pump thread, then wake it up
500 if hasattr(cls, 'pump_thread_stop_flag'):
501 cls.pump_thread_stop_flag.set()
502 if hasattr(cls, 'pump_thread_wakeup_pipe'):
503 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
504 if hasattr(cls, 'pump_thread'):
505 cls.logger.debug("Waiting for pump thread to stop")
506 cls.pump_thread.join()
507 if hasattr(cls, 'vpp_stderr_reader_thread'):
508 cls.logger.debug("Waiting for stdderr pump to stop")
509 cls.vpp_stderr_reader_thread.join()
511 if hasattr(cls, 'vpp'):
512 if hasattr(cls, 'vapi'):
513 cls.vapi.disconnect()
516 if cls.vpp.returncode is None:
517 cls.logger.debug("Sending TERM to vpp")
519 cls.logger.debug("Waiting for vpp to die")
520 cls.vpp.communicate()
523 if cls.vpp_startup_failed:
524 stdout_log = cls.logger.info
525 stderr_log = cls.logger.critical
527 stdout_log = cls.logger.info
528 stderr_log = cls.logger.info
530 if hasattr(cls, 'vpp_stdout_deque'):
531 stdout_log(single_line_delim)
532 stdout_log('VPP output to stdout while running %s:', cls.__name__)
533 stdout_log(single_line_delim)
534 vpp_output = "".join(cls.vpp_stdout_deque)
535 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
537 stdout_log('\n%s', vpp_output)
538 stdout_log(single_line_delim)
540 if hasattr(cls, 'vpp_stderr_deque'):
541 stderr_log(single_line_delim)
542 stderr_log('VPP output to stderr while running %s:', cls.__name__)
543 stderr_log(single_line_delim)
544 vpp_output = "".join(str(cls.vpp_stderr_deque))
545 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
547 stderr_log('\n%s', vpp_output)
548 stderr_log(single_line_delim)
551 def tearDownClass(cls):
552 """ Perform final cleanup after running all tests in this test-case """
553 cls.reporter.send_keep_alive(cls, 'tearDownClass')
555 cls.file_handler.close()
556 cls.reset_packet_infos()
558 debug_internal.on_tear_down_class(cls)
561 """ Show various debug prints after each test """
562 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
563 (self.__class__.__name__, self._testMethodName,
564 self._testMethodDoc))
565 if not self.vpp_dead:
566 self.logger.debug(self.vapi.cli("show trace"))
567 self.logger.info(self.vapi.ppcli("show interface"))
568 self.logger.info(self.vapi.ppcli("show hardware"))
569 self.logger.info(self.statistics.set_errors_str())
570 self.logger.info(self.vapi.ppcli("show run"))
571 self.logger.info(self.vapi.ppcli("show log"))
572 self.registry.remove_vpp_config(self.logger)
573 # Save/Dump VPP api trace log
574 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
575 tmp_api_trace = "/tmp/%s" % api_trace
576 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
577 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
578 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
580 os.rename(tmp_api_trace, vpp_api_trace_log)
581 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
584 self.registry.unregister_all(self.logger)
587 """ Clear trace before running each test"""
588 self.reporter.send_keep_alive(self)
589 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
590 (self.__class__.__name__, self._testMethodName,
591 self._testMethodDoc))
593 raise Exception("VPP is dead when setting up the test")
594 self.sleep(.1, "during setUp")
595 self.vpp_stdout_deque.append(
596 "--- test setUp() for %s.%s(%s) starts here ---\n" %
597 (self.__class__.__name__, self._testMethodName,
598 self._testMethodDoc))
599 self.vpp_stderr_deque.append(
600 "--- test setUp() for %s.%s(%s) starts here ---\n" %
601 (self.__class__.__name__, self._testMethodName,
602 self._testMethodDoc))
603 self.vapi.cli("clear trace")
604 # store the test instance inside the test class - so that objects
605 # holding the class can access instance methods (like assertEqual)
606 type(self).test_instance = self
609 def pg_enable_capture(cls, interfaces=None):
611 Enable capture on packet-generator interfaces
613 :param interfaces: iterable interface indexes (if None,
614 use self.pg_interfaces)
617 if interfaces is None:
618 interfaces = cls.pg_interfaces
623 def register_capture(cls, cap_name):
624 """ Register a capture in the testclass """
625 # add to the list of captures with current timestamp
626 cls._captures.append((time.time(), cap_name))
627 # filter out from zombies
628 cls._zombie_captures = [(stamp, name)
629 for (stamp, name) in cls._zombie_captures
634 """ Remove any zombie captures and enable the packet generator """
635 # how long before capture is allowed to be deleted - otherwise vpp
636 # crashes - 100ms seems enough (this shouldn't be needed at all)
639 for stamp, cap_name in cls._zombie_captures:
640 wait = stamp + capture_ttl - now
642 cls.sleep(wait, "before deleting capture %s" % cap_name)
644 cls.logger.debug("Removing zombie capture %s" % cap_name)
645 cls.vapi.cli('packet-generator delete %s' % cap_name)
647 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
648 cls.vapi.cli('packet-generator enable')
649 cls._zombie_captures = cls._captures
653 def create_pg_interfaces(cls, interfaces):
655 Create packet-generator interfaces.
657 :param interfaces: iterable indexes of the interfaces.
658 :returns: List of created interfaces.
663 intf = VppPGInterface(cls, i)
664 setattr(cls, intf.name, intf)
666 cls.pg_interfaces = result
670 def create_loopback_interfaces(cls, count):
672 Create loopback interfaces.
674 :param count: number of interfaces created.
675 :returns: List of created interfaces.
677 result = [VppLoInterface(cls) for i in range(count)]
679 setattr(cls, intf.name, intf)
680 cls.lo_interfaces = result
684 def extend_packet(packet, size, padding=' '):
686 Extend packet to given size by padding with spaces or custom padding
687 NOTE: Currently works only when Raw layer is present.
689 :param packet: packet
690 :param size: target size
691 :param padding: padding used to extend the payload
694 packet_len = len(packet) + 4
695 extend = size - packet_len
697 num = (extend / len(padding)) + 1
698 packet[Raw].load += (padding * num)[:extend]
701 def reset_packet_infos(cls):
702 """ Reset the list of packet info objects and packet counts to zero """
703 cls._packet_infos = {}
704 cls._packet_count_for_dst_if_idx = {}
707 def create_packet_info(cls, src_if, dst_if):
709 Create packet info object containing the source and destination indexes
710 and add it to the testcase's packet info list
712 :param VppInterface src_if: source interface
713 :param VppInterface dst_if: destination interface
715 :returns: _PacketInfo object
719 info.index = len(cls._packet_infos)
720 info.src = src_if.sw_if_index
721 info.dst = dst_if.sw_if_index
722 if isinstance(dst_if, VppSubInterface):
723 dst_idx = dst_if.parent.sw_if_index
725 dst_idx = dst_if.sw_if_index
726 if dst_idx in cls._packet_count_for_dst_if_idx:
727 cls._packet_count_for_dst_if_idx[dst_idx] += 1
729 cls._packet_count_for_dst_if_idx[dst_idx] = 1
730 cls._packet_infos[info.index] = info
734 def info_to_payload(info):
736 Convert _PacketInfo object to packet payload
738 :param info: _PacketInfo object
740 :returns: string containing serialized data from packet info
742 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
746 def payload_to_info(payload):
748 Convert packet payload to _PacketInfo object
750 :param payload: packet payload
752 :returns: _PacketInfo object containing de-serialized data from payload
755 numbers = payload.split()
757 info.index = int(numbers[0])
758 info.src = int(numbers[1])
759 info.dst = int(numbers[2])
760 info.ip = int(numbers[3])
761 info.proto = int(numbers[4])
764 def get_next_packet_info(self, info):
766 Iterate over the packet info list stored in the testcase
767 Start iteration with first element if info is None
768 Continue based on index in info if info is specified
770 :param info: info or None
771 :returns: next info in list or None if no more infos
776 next_index = info.index + 1
777 if next_index == len(self._packet_infos):
780 return self._packet_infos[next_index]
782 def get_next_packet_info_for_interface(self, src_index, info):
784 Search the packet info list for the next packet info with same source
787 :param src_index: source interface index to search for
788 :param info: packet info - where to start the search
789 :returns: packet info or None
793 info = self.get_next_packet_info(info)
796 if info.src == src_index:
799 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
801 Search the packet info list for the next packet info with same source
802 and destination interface indexes
804 :param src_index: source interface index to search for
805 :param dst_index: destination interface index to search for
806 :param info: packet info - where to start the search
807 :returns: packet info or None
811 info = self.get_next_packet_info_for_interface(src_index, info)
814 if info.dst == dst_index:
817 def assert_equal(self, real_value, expected_value, name_or_class=None):
818 if name_or_class is None:
819 self.assertEqual(real_value, expected_value)
822 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
823 msg = msg % (getdoc(name_or_class).strip(),
824 real_value, str(name_or_class(real_value)),
825 expected_value, str(name_or_class(expected_value)))
827 msg = "Invalid %s: %s does not match expected value %s" % (
828 name_or_class, real_value, expected_value)
830 self.assertEqual(real_value, expected_value, msg)
832 def assert_in_range(self,
840 msg = "Invalid %s: %s out of range <%s,%s>" % (
841 name, real_value, expected_min, expected_max)
842 self.assertTrue(expected_min <= real_value <= expected_max, msg)
844 def assert_packet_checksums_valid(self, packet,
845 ignore_zero_udp_checksums=True):
846 received = packet.__class__(str(packet))
848 ppp("Verifying packet checksums for packet:", received))
849 udp_layers = ['UDP', 'UDPerror']
850 checksum_fields = ['cksum', 'chksum']
853 temp = received.__class__(str(received))
855 layer = temp.getlayer(counter)
857 for cf in checksum_fields:
858 if hasattr(layer, cf):
859 if ignore_zero_udp_checksums and \
860 0 == getattr(layer, cf) and \
861 layer.name in udp_layers:
864 checksums.append((counter, cf))
867 counter = counter + 1
868 if 0 == len(checksums):
870 temp = temp.__class__(str(temp))
871 for layer, cf in checksums:
872 calc_sum = getattr(temp[layer], cf)
874 getattr(received[layer], cf), calc_sum,
875 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
877 "Checksum field `%s` on `%s` layer has correct value `%s`" %
878 (cf, temp[layer].name, calc_sum))
880 def assert_checksum_valid(self, received_packet, layer,
882 ignore_zero_checksum=False):
883 """ Check checksum of received packet on given layer """
884 received_packet_checksum = getattr(received_packet[layer], field_name)
885 if ignore_zero_checksum and 0 == received_packet_checksum:
887 recalculated = received_packet.__class__(str(received_packet))
888 delattr(recalculated[layer], field_name)
889 recalculated = recalculated.__class__(str(recalculated))
890 self.assert_equal(received_packet_checksum,
891 getattr(recalculated[layer], field_name),
892 "packet checksum on layer: %s" % layer)
894 def assert_ip_checksum_valid(self, received_packet,
895 ignore_zero_checksum=False):
896 self.assert_checksum_valid(received_packet, 'IP',
897 ignore_zero_checksum=ignore_zero_checksum)
899 def assert_tcp_checksum_valid(self, received_packet,
900 ignore_zero_checksum=False):
901 self.assert_checksum_valid(received_packet, 'TCP',
902 ignore_zero_checksum=ignore_zero_checksum)
904 def assert_udp_checksum_valid(self, received_packet,
905 ignore_zero_checksum=True):
906 self.assert_checksum_valid(received_packet, 'UDP',
907 ignore_zero_checksum=ignore_zero_checksum)
909 def assert_embedded_icmp_checksum_valid(self, received_packet):
910 if received_packet.haslayer(IPerror):
911 self.assert_checksum_valid(received_packet, 'IPerror')
912 if received_packet.haslayer(TCPerror):
913 self.assert_checksum_valid(received_packet, 'TCPerror')
914 if received_packet.haslayer(UDPerror):
915 self.assert_checksum_valid(received_packet, 'UDPerror',
916 ignore_zero_checksum=True)
917 if received_packet.haslayer(ICMPerror):
918 self.assert_checksum_valid(received_packet, 'ICMPerror')
920 def assert_icmp_checksum_valid(self, received_packet):
921 self.assert_checksum_valid(received_packet, 'ICMP')
922 self.assert_embedded_icmp_checksum_valid(received_packet)
924 def assert_icmpv6_checksum_valid(self, pkt):
925 if pkt.haslayer(ICMPv6DestUnreach):
926 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
927 self.assert_embedded_icmp_checksum_valid(pkt)
928 if pkt.haslayer(ICMPv6EchoRequest):
929 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
930 if pkt.haslayer(ICMPv6EchoReply):
931 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
933 def assert_packet_counter_equal(self, counter, expected_value):
934 if counter.startswith("/"):
935 counter_value = self.statistics.get_counter(counter)
936 self.assert_equal(counter_value, expected_value,
937 "packet counter `%s'" % counter)
939 counters = self.vapi.cli("sh errors").split('\n')
941 for i in range(1, len(counters) - 1):
942 results = counters[i].split()
943 if results[1] == counter:
944 counter_value = int(results[0])
948 def sleep(cls, timeout, remark=None):
949 if hasattr(cls, 'logger'):
950 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
954 if hasattr(cls, 'logger') and after - before > 2 * timeout:
955 cls.logger.error("unexpected time.sleep() result - "
956 "slept for %es instead of ~%es!",
957 after - before, timeout)
958 if hasattr(cls, 'logger'):
960 "Finished sleep (%s) - slept %es (wanted %es)",
961 remark, after - before, timeout)
963 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
964 self.vapi.cli("clear trace")
965 intf.add_stream(pkts)
966 self.pg_enable_capture(self.pg_interfaces)
970 for i in self.pg_interfaces:
971 i.get_capture(0, timeout=timeout)
972 i.assert_nothing_captured(remark=remark)
975 def send_and_expect(self, input, pkts, output):
976 self.vapi.cli("clear trace")
977 input.add_stream(pkts)
978 self.pg_enable_capture(self.pg_interfaces)
980 if isinstance(object, (list,)):
983 rx.append(output.get_capture(len(pkts)))
985 rx = output.get_capture(len(pkts))
988 def send_and_expect_only(self, input, pkts, output, timeout=None):
989 self.vapi.cli("clear trace")
990 input.add_stream(pkts)
991 self.pg_enable_capture(self.pg_interfaces)
993 if isinstance(object, (list,)):
997 rx.append(output.get_capture(len(pkts)))
999 rx = output.get_capture(len(pkts))
1003 for i in self.pg_interfaces:
1004 if i not in outputs:
1005 i.get_capture(0, timeout=timeout)
1006 i.assert_nothing_captured()
1012 """ unittest calls runTest when TestCase is instantiated without a
1013 test case. Use case: Writing unittests against VppTestCase"""
1017 def get_testcase_doc_name(test):
1018 return getdoc(test.__class__).splitlines()[0]
1021 def get_test_description(descriptions, test):
1022 short_description = test.shortDescription()
1023 if descriptions and short_description:
1024 return short_description
1029 class TestCaseInfo(object):
1030 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1031 self.logger = logger
1032 self.tempdir = tempdir
1033 self.vpp_pid = vpp_pid
1034 self.vpp_bin_path = vpp_bin_path
1035 self.core_crash_test = None
1038 class VppTestResult(unittest.TestResult):
1040 @property result_string
1041 String variable to store the test case result string.
1043 List variable containing 2-tuples of TestCase instances and strings
1044 holding formatted tracebacks. Each tuple represents a test which
1045 raised an unexpected exception.
1047 List variable containing 2-tuples of TestCase instances and strings
1048 holding formatted tracebacks. Each tuple represents a test where
1049 a failure was explicitly signalled using the TestCase.assert*()
1053 failed_test_cases_info = set()
1054 core_crash_test_cases_info = set()
1055 current_test_case_info = None
1057 def __init__(self, stream, descriptions, verbosity, runner):
1059 :param stream File descriptor to store where to report test results.
1060 Set to the standard error stream by default.
1061 :param descriptions Boolean variable to store information if to use
1062 test case descriptions.
1063 :param verbosity Integer variable to store required verbosity level.
1065 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
1066 self.stream = stream
1067 self.descriptions = descriptions
1068 self.verbosity = verbosity
1069 self.result_string = None
1070 self.runner = runner
1072 def addSuccess(self, test):
1074 Record a test succeeded result
1079 if self.current_test_case_info:
1080 self.current_test_case_info.logger.debug(
1081 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1082 test._testMethodName,
1083 test._testMethodDoc))
1084 unittest.TestResult.addSuccess(self, test)
1085 self.result_string = colorize("OK", GREEN)
1087 self.send_result_through_pipe(test, PASS)
1089 def addSkip(self, test, reason):
1091 Record a test skipped.
1097 if self.current_test_case_info:
1098 self.current_test_case_info.logger.debug(
1099 "--- addSkip() %s.%s(%s) called, reason is %s" %
1100 (test.__class__.__name__, test._testMethodName,
1101 test._testMethodDoc, reason))
1102 unittest.TestResult.addSkip(self, test, reason)
1103 self.result_string = colorize("SKIP", YELLOW)
1105 self.send_result_through_pipe(test, SKIP)
1107 def symlink_failed(self):
1108 if self.current_test_case_info:
1110 failed_dir = os.getenv('FAILED_DIR')
1111 link_path = os.path.join(
1114 os.path.basename(self.current_test_case_info.tempdir))
1115 if self.current_test_case_info.logger:
1116 self.current_test_case_info.logger.debug(
1117 "creating a link to the failed test")
1118 self.current_test_case_info.logger.debug(
1119 "os.symlink(%s, %s)" %
1120 (self.current_test_case_info.tempdir, link_path))
1121 if os.path.exists(link_path):
1122 if self.current_test_case_info.logger:
1123 self.current_test_case_info.logger.debug(
1124 'symlink already exists')
1126 os.symlink(self.current_test_case_info.tempdir, link_path)
1128 except Exception as e:
1129 if self.current_test_case_info.logger:
1130 self.current_test_case_info.logger.error(e)
1132 def send_result_through_pipe(self, test, result):
1133 if hasattr(self, 'test_framework_result_pipe'):
1134 pipe = self.test_framework_result_pipe
1136 pipe.send((test.id(), result))
1138 def log_error(self, test, err, fn_name):
1139 if self.current_test_case_info:
1140 if isinstance(test, unittest.suite._ErrorHolder):
1141 test_name = test.description
1143 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1144 test._testMethodName,
1145 test._testMethodDoc)
1146 self.current_test_case_info.logger.debug(
1147 "--- %s() %s called, err is %s" %
1148 (fn_name, test_name, err))
1149 self.current_test_case_info.logger.debug(
1150 "formatted exception is:\n%s" %
1151 "".join(format_exception(*err)))
1153 def add_error(self, test, err, unittest_fn, error_type):
1154 if error_type == FAIL:
1155 self.log_error(test, err, 'addFailure')
1156 error_type_str = colorize("FAIL", RED)
1157 elif error_type == ERROR:
1158 self.log_error(test, err, 'addError')
1159 error_type_str = colorize("ERROR", RED)
1161 raise Exception('Error type %s cannot be used to record an '
1162 'error or a failure' % error_type)
1164 unittest_fn(self, test, err)
1165 if self.current_test_case_info:
1166 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1168 self.current_test_case_info.tempdir)
1169 self.symlink_failed()
1170 self.failed_test_cases_info.add(self.current_test_case_info)
1171 if is_core_present(self.current_test_case_info.tempdir):
1172 if not self.current_test_case_info.core_crash_test:
1173 if isinstance(test, unittest.suite._ErrorHolder):
1174 test_name = str(test)
1176 test_name = "'{}' ({})".format(
1177 get_testcase_doc_name(test), test.id())
1178 self.current_test_case_info.core_crash_test = test_name
1179 self.core_crash_test_cases_info.add(
1180 self.current_test_case_info)
1182 self.result_string = '%s [no temp dir]' % error_type_str
1184 self.send_result_through_pipe(test, error_type)
1186 def addFailure(self, test, err):
1188 Record a test failed result
1191 :param err: error message
1194 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1196 def addError(self, test, err):
1198 Record a test error result
1201 :param err: error message
1204 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1206 def getDescription(self, test):
1208 Get test description
1211 :returns: test description
1214 return get_test_description(self.descriptions, test)
1216 def startTest(self, test):
1225 unittest.TestResult.startTest(self, test)
1226 if self.verbosity > 0:
1227 self.stream.writeln(
1228 "Starting " + self.getDescription(test) + " ...")
1229 self.stream.writeln(single_line_delim)
1231 def stopTest(self, test):
1233 Called when the given test has been run
1238 unittest.TestResult.stopTest(self, test)
1239 if self.verbosity > 0:
1240 self.stream.writeln(single_line_delim)
1241 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1242 self.result_string))
1243 self.stream.writeln(single_line_delim)
1245 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1246 self.result_string))
1248 self.send_result_through_pipe(test, TEST_RUN)
1250 def printErrors(self):
1252 Print errors from running the test case
1254 if len(self.errors) > 0 or len(self.failures) > 0:
1255 self.stream.writeln()
1256 self.printErrorList('ERROR', self.errors)
1257 self.printErrorList('FAIL', self.failures)
1259 # ^^ that is the last output from unittest before summary
1260 if not self.runner.print_summary:
1261 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1262 self.stream = devnull
1263 self.runner.stream = devnull
1265 def printErrorList(self, flavour, errors):
1267 Print error list to the output stream together with error type
1268 and test case description.
1270 :param flavour: error type
1271 :param errors: iterable errors
1274 for test, err in errors:
1275 self.stream.writeln(double_line_delim)
1276 self.stream.writeln("%s: %s" %
1277 (flavour, self.getDescription(test)))
1278 self.stream.writeln(single_line_delim)
1279 self.stream.writeln("%s" % err)
1282 class VppTestRunner(unittest.TextTestRunner):
1284 A basic test runner implementation which prints results to standard error.
1288 def resultclass(self):
1289 """Class maintaining the results of the tests"""
1290 return VppTestResult
1292 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1293 result_pipe=None, failfast=False, buffer=False,
1294 resultclass=None, print_summary=True):
1295 # ignore stream setting here, use hard-coded stdout to be in sync
1296 # with prints from VppTestCase methods ...
1297 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1298 verbosity, failfast, buffer,
1300 KeepAliveReporter.pipe = keep_alive_pipe
1302 self.orig_stream = self.stream
1303 self.resultclass.test_framework_result_pipe = result_pipe
1305 self.print_summary = print_summary
1307 def _makeResult(self):
1308 return self.resultclass(self.stream,
1313 def run(self, test):
1320 faulthandler.enable() # emit stack trace to stderr if killed by signal
1322 result = super(VppTestRunner, self).run(test)
1323 if not self.print_summary:
1324 self.stream = self.orig_stream
1325 result.stream = self.orig_stream
1329 class Worker(Thread):
1330 def __init__(self, args, logger, env={}):
1331 self.logger = logger
1334 self.env = copy.deepcopy(env)
1335 super(Worker, self).__init__()
1338 executable = self.args[0]
1339 self.logger.debug("Running executable w/args `%s'" % self.args)
1340 env = os.environ.copy()
1341 env.update(self.env)
1342 env["CK_LOG_FILE_NAME"] = "-"
1343 self.process = subprocess.Popen(
1344 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1345 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1346 out, err = self.process.communicate()
1347 self.logger.debug("Finished running `%s'" % executable)
1348 self.logger.info("Return code is `%s'" % self.process.returncode)
1349 self.logger.info(single_line_delim)
1350 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1351 self.logger.info(single_line_delim)
1352 self.logger.info(out)
1353 self.logger.info(single_line_delim)
1354 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1355 self.logger.info(single_line_delim)
1356 self.logger.info(err)
1357 self.logger.info(single_line_delim)
1358 self.result = self.process.returncode