3 from __future__ import print_function
16 from collections import deque
17 from threading import Thread, Event
18 from inspect import getdoc, isclass
19 from traceback import format_exception
20 from logging import FileHandler, DEBUG, Formatter
21 from scapy.packet import Raw
22 from hook import StepHook, PollHook, VppDiedError
23 from vpp_config import VppTestCaseVppConfig
24 from vpp_interface import VppInterface
25 from vpp_sub_interface import VppSubInterface
26 from vpp_pg_interface import VppPGInterface
27 from vpp_lo_interface import VppLoInterface
28 from vpp_papi_provider import VppPapiProvider
29 from vpp_papi.vpp_stats import VPPStats
30 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
32 from vpp_object import VppObjectRegistry
33 from util import ppp, is_core_present
34 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
35 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
36 from scapy.layers.inet6 import ICMPv6EchoReply
38 if os.name == 'posix' and sys.version_info[0] < 3:
39 # using subprocess32 is recommended by python official documentation
40 # @ https://docs.python.org/2/library/subprocess.html
41 import subprocess32 as subprocess
45 # Python2/3 compatible
57 debug_framework = False
58 if os.getenv('TEST_DEBUG', "0") == "1":
59 debug_framework = True
63 Test framework module.
65 The module provides a set of tools for constructing and running tests and
66 representing the results.
70 class _PacketInfo(object):
71 """Private class to create packet info object.
73 Help process information about the next packet.
74 Set variables to default values.
76 #: Store the index of the packet.
78 #: Store the index of the source packet generator interface of the packet.
80 #: Store the index of the destination packet generator interface
83 #: Store expected ip version
85 #: Store expected upper protocol
87 #: Store the copy of the former packet.
90 def __eq__(self, other):
91 index = self.index == other.index
92 src = self.src == other.src
93 dst = self.dst == other.dst
94 data = self.data == other.data
95 return index and src and dst and data
98 def pump_output(testclass):
99 """ pump output from vpp stdout/stderr to proper queues """
102 while not testclass.pump_thread_stop_flag.is_set():
103 readable = select.select([testclass.vpp.stdout.fileno(),
104 testclass.vpp.stderr.fileno(),
105 testclass.pump_thread_wakeup_pipe[0]],
107 if testclass.vpp.stdout.fileno() in readable:
108 read = os.read(testclass.vpp.stdout.fileno(), 102400)
110 split = read.splitlines(True)
111 if len(stdout_fragment) > 0:
112 split[0] = "%s%s" % (stdout_fragment, split[0])
113 if len(split) > 0 and split[-1].endswith("\n"):
117 stdout_fragment = split[-1]
118 testclass.vpp_stdout_deque.extend(split[:limit])
119 if not testclass.cache_vpp_output:
120 for line in split[:limit]:
121 testclass.logger.debug(
122 "VPP STDOUT: %s" % line.rstrip("\n"))
123 if testclass.vpp.stderr.fileno() in readable:
124 read = os.read(testclass.vpp.stderr.fileno(), 102400)
126 split = read.splitlines(True)
127 if len(stderr_fragment) > 0:
128 split[0] = "%s%s" % (stderr_fragment, split[0])
129 if len(split) > 0 and split[-1].endswith(b"\n"):
133 stderr_fragment = split[-1]
134 testclass.vpp_stderr_deque.extend(split[:limit])
135 if not testclass.cache_vpp_output:
136 for line in split[:limit]:
137 testclass.logger.debug(
138 "VPP STDERR: %s" % line.rstrip("\n"))
139 # ignoring the dummy pipe here intentionally - the
140 # flag will take care of properly terminating the loop
143 def _is_skip_aarch64_set():
144 return os.getenv('SKIP_AARCH64', 'n').lower() in ('yes', 'y', '1')
146 is_skip_aarch64_set = _is_skip_aarch64_set()
149 def _is_platform_aarch64():
150 return platform.machine() == 'aarch64'
152 is_platform_aarch64 = _is_platform_aarch64()
155 def _running_extended_tests():
156 s = os.getenv("EXTENDED_TESTS", "n")
157 return True if s.lower() in ("y", "yes", "1") else False
159 running_extended_tests = _running_extended_tests()
162 def _running_on_centos():
163 os_id = os.getenv("OS_ID", "")
164 return True if "centos" in os_id.lower() else False
166 running_on_centos = _running_on_centos
169 class KeepAliveReporter(object):
171 Singleton object which reports test start to parent process
176 self.__dict__ = self._shared_state
184 def pipe(self, pipe):
185 if self._pipe is not None:
186 raise Exception("Internal error - pipe should only be set once.")
189 def send_keep_alive(self, test, desc=None):
191 Write current test tmpdir & desc to keep-alive pipe to signal liveness
193 if self.pipe is None:
194 # if not running forked..
198 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
202 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
205 class VppTestCase(unittest.TestCase):
206 """This subclass is a base class for VPP test cases that are implemented as
207 classes. It provides methods to create and run test case.
210 CLI_LISTEN_DEFAULT = 'localhost:5002'
211 config = VppTestCaseVppConfig()
214 def packet_infos(self):
215 """List of packet infos"""
216 return self._packet_infos
219 def get_packet_count_for_if_idx(cls, dst_if_index):
220 """Get the number of packet info for specified destination if index"""
221 if dst_if_index in cls._packet_count_for_dst_if_idx:
222 return cls._packet_count_for_dst_if_idx[dst_if_index]
228 """Return the instance of this testcase"""
229 return cls.test_instance
232 def set_debug_flags(cls, d):
233 cls.debug_core = False
234 cls.debug_gdb = False
235 cls.debug_gdbserver = False
240 cls.debug_core = True
243 elif dl == "gdbserver":
244 cls.debug_gdbserver = True
246 raise Exception("Unrecognized DEBUG option: '%s'" % d)
249 def get_least_used_cpu():
250 cpu_usage_list = [set(range(psutil.cpu_count()))]
251 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
252 if 'vpp_main' == p.info['name']]
253 for vpp_process in vpp_processes:
254 for cpu_usage_set in cpu_usage_list:
256 cpu_num = vpp_process.cpu_num()
257 if cpu_num in cpu_usage_set:
258 cpu_usage_set_index = cpu_usage_list.index(
260 if cpu_usage_set_index == len(cpu_usage_list) - 1:
261 cpu_usage_list.append({cpu_num})
263 cpu_usage_list[cpu_usage_set_index + 1].add(
265 cpu_usage_set.remove(cpu_num)
267 except psutil.NoSuchProcess:
270 for cpu_usage_set in cpu_usage_list:
271 if len(cpu_usage_set) > 0:
272 min_usage_set = cpu_usage_set
275 return random.choice(tuple(min_usage_set))
278 def print_header(cls):
279 if not hasattr(cls, '_header_printed'):
280 print(double_line_delim)
281 print(colorize(getdoc(cls).splitlines()[0], GREEN))
282 print(double_line_delim)
283 cls._header_printed = True
286 def setUpConstants(cls):
287 """ Set-up the test case class based on environment variables """
288 s = os.getenv("STEP", "n")
289 cls.step = True if s.lower() in ("y", "yes", "1") else False
290 d = os.getenv("DEBUG", None)
291 c = os.getenv("CACHE_OUTPUT", "1")
292 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
293 cls.set_debug_flags(d)
294 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
295 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
296 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
298 if cls.plugin_path is not None:
299 if cls.extern_plugin_path is not None:
300 plugin_path = "%s:%s" % (
301 cls.plugin_path, cls.extern_plugin_path)
303 plugin_path = cls.plugin_path
304 elif cls.extern_plugin_path is not None:
305 plugin_path = cls.extern_plugin_path
307 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
308 cls.config.add('unix', 'cli-listen', cls.CLI_LISTEN_DEFAULT)
311 size = os.getenv("COREDUMP_SIZE")
312 cls.config.add('unix', 'coredump-size',
313 size if size is not None else 'unlimited')
315 cls.config.add('unix', 'runtime-dir', cls.tempdir)
316 cls.config.add('api-segment', 'prefix', cls.shm_prefix)
317 cls.config.add('cpu', 'main-core', str(cls.get_least_used_cpu()))
318 cls.config.add('statseg', 'socket-name', cls.stats_sock)
320 if plugin_path is not None:
321 cls.config.add('plugins', 'path', plugin_path)
322 cls.config.add_plugin('dpdk_plugin.so', 'disable')
323 cls.config.add_plugin('unittest_plugin.so', 'enable')
325 cls.vpp_cmdline = [cls.vpp_bin] + cls.config.shlex()
326 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
327 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
330 def wait_for_enter(cls):
331 if cls.debug_gdbserver:
332 print(double_line_delim)
333 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
335 print(double_line_delim)
336 print("Spawned VPP with PID: %d" % cls.vpp.pid)
338 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
340 print(single_line_delim)
341 print("You can debug the VPP using e.g.:")
342 if cls.debug_gdbserver:
343 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
344 print("Now is the time to attach a gdb by running the above "
345 "command, set up breakpoints etc. and then resume VPP from "
346 "within gdb by issuing the 'continue' command")
348 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
349 print("Now is the time to attach a gdb by running the above "
350 "command and set up breakpoints etc.")
351 print(single_line_delim)
352 input("Press ENTER to continue running the testcase...")
356 cmdline = cls.vpp_cmdline
358 if cls.debug_gdbserver:
359 gdbserver = '/usr/bin/gdbserver'
360 if not os.path.isfile(gdbserver) or \
361 not os.access(gdbserver, os.X_OK):
362 raise Exception("gdbserver binary '%s' does not exist or is "
363 "not executable" % gdbserver)
365 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
366 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
369 cls.vpp = subprocess.Popen(cmdline,
370 stdout=subprocess.PIPE,
371 stderr=subprocess.PIPE,
373 except subprocess.CalledProcessError as e:
374 cls.logger.critical("Couldn't start vpp: %s" % e)
380 def wait_for_stats_socket(cls):
381 deadline = time.time() + 3
383 while time.time() < deadline or \
384 cls.debug_gdb or cls.debug_gdbserver:
385 if os.path.exists(cls.stats_sock):
390 cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
395 Perform class setup before running the testcase
396 Remove shared memory files, start vpp and connect the vpp-api
398 gc.collect() # run garbage collection first
401 cls.logger = get_logger(cls.__name__)
402 if hasattr(cls, 'parallel_handler'):
403 cls.logger.addHandler(cls.parallel_handler)
404 cls.logger.propagate = False
405 cls.tempdir = tempfile.mkdtemp(
406 prefix='vpp-unittest-%s-' % cls.__name__)
407 cls.stats_sock = "%s/stats.sock" % cls.tempdir
408 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
409 cls.file_handler.setFormatter(
410 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
412 cls.file_handler.setLevel(DEBUG)
413 cls.logger.addHandler(cls.file_handler)
414 cls.shm_prefix = os.path.basename(cls.tempdir)
415 os.chdir(cls.tempdir)
416 cls.logger.info("Temporary dir is %s, shm prefix is %s",
417 cls.tempdir, cls.shm_prefix)
419 cls.reset_packet_infos()
421 cls._zombie_captures = []
424 cls.registry = VppObjectRegistry()
425 cls.vpp_startup_failed = False
426 cls.reporter = KeepAliveReporter()
427 # need to catch exceptions here because if we raise, then the cleanup
428 # doesn't get called and we might end with a zombie vpp
431 cls.reporter.send_keep_alive(cls, 'setUpClass')
432 VppTestResult.current_test_case_info = TestCaseInfo(
433 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
434 cls.vpp_stdout_deque = deque()
435 cls.vpp_stderr_deque = deque()
436 cls.pump_thread_stop_flag = Event()
437 cls.pump_thread_wakeup_pipe = os.pipe()
438 cls.pump_thread = Thread(target=pump_output, args=(cls,))
439 cls.pump_thread.daemon = True
440 cls.pump_thread.start()
441 if cls.debug_gdb or cls.debug_gdbserver:
445 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
451 cls.vapi.register_hook(hook)
452 cls.wait_for_stats_socket()
453 cls.statistics = VPPStats(socketname=cls.stats_sock)
457 cls.vpp_startup_failed = True
459 "VPP died shortly after startup, check the"
460 " output to standard error for possible cause")
466 cls.vapi.disconnect()
469 if cls.debug_gdbserver:
470 print(colorize("You're running VPP inside gdbserver but "
471 "VPP-API connection failed, did you forget "
472 "to 'continue' VPP from within gdb?", RED))
484 Disconnect vpp-api, kill vpp and cleanup shared memory files
486 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
488 if cls.vpp.returncode is None:
489 print(double_line_delim)
490 print("VPP or GDB server is still running")
491 print(single_line_delim)
492 input("When done debugging, press ENTER to kill the "
493 "process and finish running the testcase...")
495 # first signal that we want to stop the pump thread, then wake it up
496 if hasattr(cls, 'pump_thread_stop_flag'):
497 cls.pump_thread_stop_flag.set()
498 if hasattr(cls, 'pump_thread_wakeup_pipe'):
499 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
500 if hasattr(cls, 'pump_thread'):
501 cls.logger.debug("Waiting for pump thread to stop")
502 cls.pump_thread.join()
503 if hasattr(cls, 'vpp_stderr_reader_thread'):
504 cls.logger.debug("Waiting for stdderr pump to stop")
505 cls.vpp_stderr_reader_thread.join()
507 if hasattr(cls, 'vpp'):
508 if hasattr(cls, 'vapi'):
509 cls.vapi.disconnect()
512 if cls.vpp.returncode is None:
513 cls.logger.debug("Sending TERM to vpp")
515 cls.logger.debug("Waiting for vpp to die")
516 cls.vpp.communicate()
519 if cls.vpp_startup_failed:
520 stdout_log = cls.logger.info
521 stderr_log = cls.logger.critical
523 stdout_log = cls.logger.info
524 stderr_log = cls.logger.info
526 if hasattr(cls, 'vpp_stdout_deque'):
527 stdout_log(single_line_delim)
528 stdout_log('VPP output to stdout while running %s:', cls.__name__)
529 stdout_log(single_line_delim)
530 vpp_output = "".join(cls.vpp_stdout_deque)
531 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
533 stdout_log('\n%s', vpp_output)
534 stdout_log(single_line_delim)
536 if hasattr(cls, 'vpp_stderr_deque'):
537 stderr_log(single_line_delim)
538 stderr_log('VPP output to stderr while running %s:', cls.__name__)
539 stderr_log(single_line_delim)
540 vpp_output = "".join(str(cls.vpp_stderr_deque))
541 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
543 stderr_log('\n%s', vpp_output)
544 stderr_log(single_line_delim)
547 def tearDownClass(cls):
548 """ Perform final cleanup after running all tests in this test-case """
549 cls.reporter.send_keep_alive(cls, 'tearDownClass')
551 cls.file_handler.close()
552 cls.reset_packet_infos()
554 debug_internal.on_tear_down_class(cls)
557 """ Show various debug prints after each test """
558 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
559 (self.__class__.__name__, self._testMethodName,
560 self._testMethodDoc))
561 if not self.vpp_dead:
562 self.logger.debug(self.vapi.cli("show trace"))
563 self.logger.info(self.vapi.ppcli("show interface"))
564 self.logger.info(self.vapi.ppcli("show hardware"))
565 self.logger.info(self.statistics.set_errors_str())
566 self.logger.info(self.vapi.ppcli("show run"))
567 self.logger.info(self.vapi.ppcli("show log"))
568 self.registry.remove_vpp_config(self.logger)
569 # Save/Dump VPP api trace log
570 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
571 tmp_api_trace = "/tmp/%s" % api_trace
572 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
573 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
574 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
576 os.rename(tmp_api_trace, vpp_api_trace_log)
577 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
580 self.registry.unregister_all(self.logger)
583 """ Clear trace before running each test"""
584 self.reporter.send_keep_alive(self)
585 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
586 (self.__class__.__name__, self._testMethodName,
587 self._testMethodDoc))
589 raise Exception("VPP is dead when setting up the test")
590 self.sleep(.1, "during setUp")
591 self.vpp_stdout_deque.append(
592 "--- test setUp() for %s.%s(%s) starts here ---\n" %
593 (self.__class__.__name__, self._testMethodName,
594 self._testMethodDoc))
595 self.vpp_stderr_deque.append(
596 "--- test setUp() for %s.%s(%s) starts here ---\n" %
597 (self.__class__.__name__, self._testMethodName,
598 self._testMethodDoc))
599 self.vapi.cli("clear trace")
600 # store the test instance inside the test class - so that objects
601 # holding the class can access instance methods (like assertEqual)
602 type(self).test_instance = self
605 def pg_enable_capture(cls, interfaces=None):
607 Enable capture on packet-generator interfaces
609 :param interfaces: iterable interface indexes (if None,
610 use self.pg_interfaces)
613 if interfaces is None:
614 interfaces = cls.pg_interfaces
619 def register_capture(cls, cap_name):
620 """ Register a capture in the testclass """
621 # add to the list of captures with current timestamp
622 cls._captures.append((time.time(), cap_name))
623 # filter out from zombies
624 cls._zombie_captures = [(stamp, name)
625 for (stamp, name) in cls._zombie_captures
630 """ Remove any zombie captures and enable the packet generator """
631 # how long before capture is allowed to be deleted - otherwise vpp
632 # crashes - 100ms seems enough (this shouldn't be needed at all)
635 for stamp, cap_name in cls._zombie_captures:
636 wait = stamp + capture_ttl - now
638 cls.sleep(wait, "before deleting capture %s" % cap_name)
640 cls.logger.debug("Removing zombie capture %s" % cap_name)
641 cls.vapi.cli('packet-generator delete %s' % cap_name)
643 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
644 cls.vapi.cli('packet-generator enable')
645 cls._zombie_captures = cls._captures
649 def create_pg_interfaces(cls, interfaces):
651 Create packet-generator interfaces.
653 :param interfaces: iterable indexes of the interfaces.
654 :returns: List of created interfaces.
659 intf = VppPGInterface(cls, i)
660 setattr(cls, intf.name, intf)
662 cls.pg_interfaces = result
666 def create_loopback_interfaces(cls, count):
668 Create loopback interfaces.
670 :param count: number of interfaces created.
671 :returns: List of created interfaces.
673 result = [VppLoInterface(cls) for i in range(count)]
675 setattr(cls, intf.name, intf)
676 cls.lo_interfaces = result
680 def extend_packet(packet, size, padding=' '):
682 Extend packet to given size by padding with spaces or custom padding
683 NOTE: Currently works only when Raw layer is present.
685 :param packet: packet
686 :param size: target size
687 :param padding: padding used to extend the payload
690 packet_len = len(packet) + 4
691 extend = size - packet_len
693 num = (extend / len(padding)) + 1
694 packet[Raw].load += (padding * num)[:extend]
697 def reset_packet_infos(cls):
698 """ Reset the list of packet info objects and packet counts to zero """
699 cls._packet_infos = {}
700 cls._packet_count_for_dst_if_idx = {}
703 def create_packet_info(cls, src_if, dst_if):
705 Create packet info object containing the source and destination indexes
706 and add it to the testcase's packet info list
708 :param VppInterface src_if: source interface
709 :param VppInterface dst_if: destination interface
711 :returns: _PacketInfo object
715 info.index = len(cls._packet_infos)
716 info.src = src_if.sw_if_index
717 info.dst = dst_if.sw_if_index
718 if isinstance(dst_if, VppSubInterface):
719 dst_idx = dst_if.parent.sw_if_index
721 dst_idx = dst_if.sw_if_index
722 if dst_idx in cls._packet_count_for_dst_if_idx:
723 cls._packet_count_for_dst_if_idx[dst_idx] += 1
725 cls._packet_count_for_dst_if_idx[dst_idx] = 1
726 cls._packet_infos[info.index] = info
730 def info_to_payload(info):
732 Convert _PacketInfo object to packet payload
734 :param info: _PacketInfo object
736 :returns: string containing serialized data from packet info
738 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
742 def payload_to_info(payload):
744 Convert packet payload to _PacketInfo object
746 :param payload: packet payload
748 :returns: _PacketInfo object containing de-serialized data from payload
751 numbers = payload.split()
753 info.index = int(numbers[0])
754 info.src = int(numbers[1])
755 info.dst = int(numbers[2])
756 info.ip = int(numbers[3])
757 info.proto = int(numbers[4])
760 def get_next_packet_info(self, info):
762 Iterate over the packet info list stored in the testcase
763 Start iteration with first element if info is None
764 Continue based on index in info if info is specified
766 :param info: info or None
767 :returns: next info in list or None if no more infos
772 next_index = info.index + 1
773 if next_index == len(self._packet_infos):
776 return self._packet_infos[next_index]
778 def get_next_packet_info_for_interface(self, src_index, info):
780 Search the packet info list for the next packet info with same source
783 :param src_index: source interface index to search for
784 :param info: packet info - where to start the search
785 :returns: packet info or None
789 info = self.get_next_packet_info(info)
792 if info.src == src_index:
795 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
797 Search the packet info list for the next packet info with same source
798 and destination interface indexes
800 :param src_index: source interface index to search for
801 :param dst_index: destination interface index to search for
802 :param info: packet info - where to start the search
803 :returns: packet info or None
807 info = self.get_next_packet_info_for_interface(src_index, info)
810 if info.dst == dst_index:
813 def assert_equal(self, real_value, expected_value, name_or_class=None):
814 if name_or_class is None:
815 self.assertEqual(real_value, expected_value)
818 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
819 msg = msg % (getdoc(name_or_class).strip(),
820 real_value, str(name_or_class(real_value)),
821 expected_value, str(name_or_class(expected_value)))
823 msg = "Invalid %s: %s does not match expected value %s" % (
824 name_or_class, real_value, expected_value)
826 self.assertEqual(real_value, expected_value, msg)
828 def assert_in_range(self,
836 msg = "Invalid %s: %s out of range <%s,%s>" % (
837 name, real_value, expected_min, expected_max)
838 self.assertTrue(expected_min <= real_value <= expected_max, msg)
840 def assert_packet_checksums_valid(self, packet,
841 ignore_zero_udp_checksums=True):
842 received = packet.__class__(str(packet))
844 ppp("Verifying packet checksums for packet:", received))
845 udp_layers = ['UDP', 'UDPerror']
846 checksum_fields = ['cksum', 'chksum']
849 temp = received.__class__(str(received))
851 layer = temp.getlayer(counter)
853 for cf in checksum_fields:
854 if hasattr(layer, cf):
855 if ignore_zero_udp_checksums and \
856 0 == getattr(layer, cf) and \
857 layer.name in udp_layers:
860 checksums.append((counter, cf))
863 counter = counter + 1
864 if 0 == len(checksums):
866 temp = temp.__class__(str(temp))
867 for layer, cf in checksums:
868 calc_sum = getattr(temp[layer], cf)
870 getattr(received[layer], cf), calc_sum,
871 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
873 "Checksum field `%s` on `%s` layer has correct value `%s`" %
874 (cf, temp[layer].name, calc_sum))
876 def assert_checksum_valid(self, received_packet, layer,
878 ignore_zero_checksum=False):
879 """ Check checksum of received packet on given layer """
880 received_packet_checksum = getattr(received_packet[layer], field_name)
881 if ignore_zero_checksum and 0 == received_packet_checksum:
883 recalculated = received_packet.__class__(str(received_packet))
884 delattr(recalculated[layer], field_name)
885 recalculated = recalculated.__class__(str(recalculated))
886 self.assert_equal(received_packet_checksum,
887 getattr(recalculated[layer], field_name),
888 "packet checksum on layer: %s" % layer)
890 def assert_ip_checksum_valid(self, received_packet,
891 ignore_zero_checksum=False):
892 self.assert_checksum_valid(received_packet, 'IP',
893 ignore_zero_checksum=ignore_zero_checksum)
895 def assert_tcp_checksum_valid(self, received_packet,
896 ignore_zero_checksum=False):
897 self.assert_checksum_valid(received_packet, 'TCP',
898 ignore_zero_checksum=ignore_zero_checksum)
900 def assert_udp_checksum_valid(self, received_packet,
901 ignore_zero_checksum=True):
902 self.assert_checksum_valid(received_packet, 'UDP',
903 ignore_zero_checksum=ignore_zero_checksum)
905 def assert_embedded_icmp_checksum_valid(self, received_packet):
906 if received_packet.haslayer(IPerror):
907 self.assert_checksum_valid(received_packet, 'IPerror')
908 if received_packet.haslayer(TCPerror):
909 self.assert_checksum_valid(received_packet, 'TCPerror')
910 if received_packet.haslayer(UDPerror):
911 self.assert_checksum_valid(received_packet, 'UDPerror',
912 ignore_zero_checksum=True)
913 if received_packet.haslayer(ICMPerror):
914 self.assert_checksum_valid(received_packet, 'ICMPerror')
916 def assert_icmp_checksum_valid(self, received_packet):
917 self.assert_checksum_valid(received_packet, 'ICMP')
918 self.assert_embedded_icmp_checksum_valid(received_packet)
920 def assert_icmpv6_checksum_valid(self, pkt):
921 if pkt.haslayer(ICMPv6DestUnreach):
922 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
923 self.assert_embedded_icmp_checksum_valid(pkt)
924 if pkt.haslayer(ICMPv6EchoRequest):
925 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
926 if pkt.haslayer(ICMPv6EchoReply):
927 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
929 def assert_packet_counter_equal(self, counter, expected_value):
930 if counter.startswith("/"):
931 counter_value = self.statistics.get_counter(counter)
932 self.assert_equal(counter_value, expected_value,
933 "packet counter `%s'" % counter)
935 counters = self.vapi.cli("sh errors").split('\n')
937 for i in range(1, len(counters) - 1):
938 results = counters[i].split()
939 if results[1] == counter:
940 counter_value = int(results[0])
944 def sleep(cls, timeout, remark=None):
945 if hasattr(cls, 'logger'):
946 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
950 if hasattr(cls, 'logger') and after - before > 2 * timeout:
951 cls.logger.error("unexpected time.sleep() result - "
952 "slept for %es instead of ~%es!",
953 after - before, timeout)
954 if hasattr(cls, 'logger'):
956 "Finished sleep (%s) - slept %es (wanted %es)",
957 remark, after - before, timeout)
959 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
960 self.vapi.cli("clear trace")
961 intf.add_stream(pkts)
962 self.pg_enable_capture(self.pg_interfaces)
966 for i in self.pg_interfaces:
967 i.get_capture(0, timeout=timeout)
968 i.assert_nothing_captured(remark=remark)
971 def send_and_expect(self, input, pkts, output):
972 self.vapi.cli("clear trace")
973 input.add_stream(pkts)
974 self.pg_enable_capture(self.pg_interfaces)
976 if isinstance(object, (list,)):
979 rx.append(output.get_capture(len(pkts)))
981 rx = output.get_capture(len(pkts))
984 def send_and_expect_only(self, input, pkts, output, timeout=None):
985 self.vapi.cli("clear trace")
986 input.add_stream(pkts)
987 self.pg_enable_capture(self.pg_interfaces)
989 if isinstance(object, (list,)):
993 rx.append(output.get_capture(len(pkts)))
995 rx = output.get_capture(len(pkts))
999 for i in self.pg_interfaces:
1000 if i not in outputs:
1001 i.get_capture(0, timeout=timeout)
1002 i.assert_nothing_captured()
1008 def get_testcase_doc_name(test):
1009 return getdoc(test.__class__).splitlines()[0]
1012 def get_test_description(descriptions, test):
1013 short_description = test.shortDescription()
1014 if descriptions and short_description:
1015 return short_description
1020 class TestCaseInfo(object):
1021 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1022 self.logger = logger
1023 self.tempdir = tempdir
1024 self.vpp_pid = vpp_pid
1025 self.vpp_bin_path = vpp_bin_path
1026 self.core_crash_test = None
1029 class VppTestResult(unittest.TestResult):
1031 @property result_string
1032 String variable to store the test case result string.
1034 List variable containing 2-tuples of TestCase instances and strings
1035 holding formatted tracebacks. Each tuple represents a test which
1036 raised an unexpected exception.
1038 List variable containing 2-tuples of TestCase instances and strings
1039 holding formatted tracebacks. Each tuple represents a test where
1040 a failure was explicitly signalled using the TestCase.assert*()
1044 failed_test_cases_info = set()
1045 core_crash_test_cases_info = set()
1046 current_test_case_info = None
1048 def __init__(self, stream, descriptions, verbosity, runner):
1050 :param stream File descriptor to store where to report test results.
1051 Set to the standard error stream by default.
1052 :param descriptions Boolean variable to store information if to use
1053 test case descriptions.
1054 :param verbosity Integer variable to store required verbosity level.
1056 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
1057 self.stream = stream
1058 self.descriptions = descriptions
1059 self.verbosity = verbosity
1060 self.result_string = None
1061 self.runner = runner
1063 def addSuccess(self, test):
1065 Record a test succeeded result
1070 if self.current_test_case_info:
1071 self.current_test_case_info.logger.debug(
1072 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1073 test._testMethodName,
1074 test._testMethodDoc))
1075 unittest.TestResult.addSuccess(self, test)
1076 self.result_string = colorize("OK", GREEN)
1078 self.send_result_through_pipe(test, PASS)
1080 def addSkip(self, test, reason):
1082 Record a test skipped.
1088 if self.current_test_case_info:
1089 self.current_test_case_info.logger.debug(
1090 "--- addSkip() %s.%s(%s) called, reason is %s" %
1091 (test.__class__.__name__, test._testMethodName,
1092 test._testMethodDoc, reason))
1093 unittest.TestResult.addSkip(self, test, reason)
1094 self.result_string = colorize("SKIP", YELLOW)
1096 self.send_result_through_pipe(test, SKIP)
1098 def symlink_failed(self):
1099 if self.current_test_case_info:
1101 failed_dir = os.getenv('FAILED_DIR')
1102 link_path = os.path.join(
1105 os.path.basename(self.current_test_case_info.tempdir))
1106 if self.current_test_case_info.logger:
1107 self.current_test_case_info.logger.debug(
1108 "creating a link to the failed test")
1109 self.current_test_case_info.logger.debug(
1110 "os.symlink(%s, %s)" %
1111 (self.current_test_case_info.tempdir, link_path))
1112 if os.path.exists(link_path):
1113 if self.current_test_case_info.logger:
1114 self.current_test_case_info.logger.debug(
1115 'symlink already exists')
1117 os.symlink(self.current_test_case_info.tempdir, link_path)
1119 except Exception as e:
1120 if self.current_test_case_info.logger:
1121 self.current_test_case_info.logger.error(e)
1123 def send_result_through_pipe(self, test, result):
1124 if hasattr(self, 'test_framework_result_pipe'):
1125 pipe = self.test_framework_result_pipe
1127 pipe.send((test.id(), result))
1129 def log_error(self, test, err, fn_name):
1130 if self.current_test_case_info:
1131 if isinstance(test, unittest.suite._ErrorHolder):
1132 test_name = test.description
1134 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1135 test._testMethodName,
1136 test._testMethodDoc)
1137 self.current_test_case_info.logger.debug(
1138 "--- %s() %s called, err is %s" %
1139 (fn_name, test_name, err))
1140 self.current_test_case_info.logger.debug(
1141 "formatted exception is:\n%s" %
1142 "".join(format_exception(*err)))
1144 def add_error(self, test, err, unittest_fn, error_type):
1145 if error_type == FAIL:
1146 self.log_error(test, err, 'addFailure')
1147 error_type_str = colorize("FAIL", RED)
1148 elif error_type == ERROR:
1149 self.log_error(test, err, 'addError')
1150 error_type_str = colorize("ERROR", RED)
1152 raise Exception('Error type %s cannot be used to record an '
1153 'error or a failure' % error_type)
1155 unittest_fn(self, test, err)
1156 if self.current_test_case_info:
1157 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1159 self.current_test_case_info.tempdir)
1160 self.symlink_failed()
1161 self.failed_test_cases_info.add(self.current_test_case_info)
1162 if is_core_present(self.current_test_case_info.tempdir):
1163 if not self.current_test_case_info.core_crash_test:
1164 if isinstance(test, unittest.suite._ErrorHolder):
1165 test_name = str(test)
1167 test_name = "'{}' ({})".format(
1168 get_testcase_doc_name(test), test.id())
1169 self.current_test_case_info.core_crash_test = test_name
1170 self.core_crash_test_cases_info.add(
1171 self.current_test_case_info)
1173 self.result_string = '%s [no temp dir]' % error_type_str
1175 self.send_result_through_pipe(test, error_type)
1177 def addFailure(self, test, err):
1179 Record a test failed result
1182 :param err: error message
1185 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1187 def addError(self, test, err):
1189 Record a test error result
1192 :param err: error message
1195 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1197 def getDescription(self, test):
1199 Get test description
1202 :returns: test description
1205 return get_test_description(self.descriptions, test)
1207 def startTest(self, test):
1216 unittest.TestResult.startTest(self, test)
1217 if self.verbosity > 0:
1218 self.stream.writeln(
1219 "Starting " + self.getDescription(test) + " ...")
1220 self.stream.writeln(single_line_delim)
1222 def stopTest(self, test):
1224 Called when the given test has been run
1229 unittest.TestResult.stopTest(self, test)
1230 if self.verbosity > 0:
1231 self.stream.writeln(single_line_delim)
1232 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1233 self.result_string))
1234 self.stream.writeln(single_line_delim)
1236 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1237 self.result_string))
1239 self.send_result_through_pipe(test, TEST_RUN)
1241 def printErrors(self):
1243 Print errors from running the test case
1245 if len(self.errors) > 0 or len(self.failures) > 0:
1246 self.stream.writeln()
1247 self.printErrorList('ERROR', self.errors)
1248 self.printErrorList('FAIL', self.failures)
1250 # ^^ that is the last output from unittest before summary
1251 if not self.runner.print_summary:
1252 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1253 self.stream = devnull
1254 self.runner.stream = devnull
1256 def printErrorList(self, flavour, errors):
1258 Print error list to the output stream together with error type
1259 and test case description.
1261 :param flavour: error type
1262 :param errors: iterable errors
1265 for test, err in errors:
1266 self.stream.writeln(double_line_delim)
1267 self.stream.writeln("%s: %s" %
1268 (flavour, self.getDescription(test)))
1269 self.stream.writeln(single_line_delim)
1270 self.stream.writeln("%s" % err)
1273 class VppTestRunner(unittest.TextTestRunner):
1275 A basic test runner implementation which prints results to standard error.
1279 def resultclass(self):
1280 """Class maintaining the results of the tests"""
1281 return VppTestResult
1283 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1284 result_pipe=None, failfast=False, buffer=False,
1285 resultclass=None, print_summary=True):
1286 # ignore stream setting here, use hard-coded stdout to be in sync
1287 # with prints from VppTestCase methods ...
1288 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1289 verbosity, failfast, buffer,
1291 KeepAliveReporter.pipe = keep_alive_pipe
1293 self.orig_stream = self.stream
1294 self.resultclass.test_framework_result_pipe = result_pipe
1296 self.print_summary = print_summary
1298 def _makeResult(self):
1299 return self.resultclass(self.stream,
1304 def run(self, test):
1311 faulthandler.enable() # emit stack trace to stderr if killed by signal
1313 result = super(VppTestRunner, self).run(test)
1314 if not self.print_summary:
1315 self.stream = self.orig_stream
1316 result.stream = self.orig_stream
1320 class Worker(Thread):
1321 def __init__(self, args, logger, env={}):
1322 self.logger = logger
1325 self.env = copy.deepcopy(env)
1326 super(Worker, self).__init__()
1329 executable = self.args[0]
1330 self.logger.debug("Running executable w/args `%s'" % self.args)
1331 env = os.environ.copy()
1332 env.update(self.env)
1333 env["CK_LOG_FILE_NAME"] = "-"
1334 self.process = subprocess.Popen(
1335 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1336 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1337 out, err = self.process.communicate()
1338 self.logger.debug("Finished running `%s'" % executable)
1339 self.logger.info("Return code is `%s'" % self.process.returncode)
1340 self.logger.info(single_line_delim)
1341 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1342 self.logger.info(single_line_delim)
1343 self.logger.info(out)
1344 self.logger.info(single_line_delim)
1345 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1346 self.logger.info(single_line_delim)
1347 self.logger.info(err)
1348 self.logger.info(single_line_delim)
1349 self.result = self.process.returncode