3 from __future__ import print_function
16 from collections import deque
17 from threading import Thread, Event
18 from inspect import getdoc, isclass
19 from traceback import format_exception
20 from logging import FileHandler, DEBUG, Formatter
21 from scapy.packet import Raw
22 from hook import StepHook, PollHook, VppDiedError
23 from vpp_pg_interface import VppPGInterface
24 from vpp_sub_interface import VppSubInterface
25 from vpp_lo_interface import VppLoInterface
26 from vpp_papi_provider import VppPapiProvider
27 from vpp_papi.vpp_stats import VPPStats
28 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
30 from vpp_object import VppObjectRegistry
31 from util import ppp, is_core_present
32 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
33 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
34 from scapy.layers.inet6 import ICMPv6EchoReply
36 if os.name == 'posix' and sys.version_info[0] < 3:
37 # using subprocess32 is recommended by python official documentation
38 # @ https://docs.python.org/2/library/subprocess.html
39 import subprocess32 as subprocess
49 debug_framework = False
50 if os.getenv('TEST_DEBUG', "0") == "1":
51 debug_framework = True
55 Test framework module.
57 The module provides a set of tools for constructing and running tests and
58 representing the results.
62 class _PacketInfo(object):
63 """Private class to create packet info object.
65 Help process information about the next packet.
66 Set variables to default values.
68 #: Store the index of the packet.
70 #: Store the index of the source packet generator interface of the packet.
72 #: Store the index of the destination packet generator interface
75 #: Store expected ip version
77 #: Store expected upper protocol
79 #: Store the copy of the former packet.
82 def __eq__(self, other):
83 index = self.index == other.index
84 src = self.src == other.src
85 dst = self.dst == other.dst
86 data = self.data == other.data
87 return index and src and dst and data
90 def pump_output(testclass):
91 """ pump output from vpp stdout/stderr to proper queues """
94 while not testclass.pump_thread_stop_flag.is_set():
95 readable = select.select([testclass.vpp.stdout.fileno(),
96 testclass.vpp.stderr.fileno(),
97 testclass.pump_thread_wakeup_pipe[0]],
99 if testclass.vpp.stdout.fileno() in readable:
100 read = os.read(testclass.vpp.stdout.fileno(), 102400)
102 split = read.splitlines(True)
103 if len(stdout_fragment) > 0:
104 split[0] = "%s%s" % (stdout_fragment, split[0])
105 if len(split) > 0 and split[-1].endswith("\n"):
109 stdout_fragment = split[-1]
110 testclass.vpp_stdout_deque.extend(split[:limit])
111 if not testclass.cache_vpp_output:
112 for line in split[:limit]:
113 testclass.logger.debug(
114 "VPP STDOUT: %s" % line.rstrip("\n"))
115 if testclass.vpp.stderr.fileno() in readable:
116 read = os.read(testclass.vpp.stderr.fileno(), 102400)
118 split = read.splitlines(True)
119 if len(stderr_fragment) > 0:
120 split[0] = "%s%s" % (stderr_fragment, split[0])
121 if len(split) > 0 and split[-1].endswith(b"\n"):
125 stderr_fragment = split[-1]
126 testclass.vpp_stderr_deque.extend(split[:limit])
127 if not testclass.cache_vpp_output:
128 for line in split[:limit]:
129 testclass.logger.debug(
130 "VPP STDERR: %s" % line.rstrip("\n"))
131 # ignoring the dummy pipe here intentionally - the
132 # flag will take care of properly terminating the loop
135 def is_skip_aarch64_set():
136 return os.getenv('SKIP_AARCH64', 'n').lower() in ('yes', 'y', '1')
139 def is_platform_aarch64():
140 return platform.machine() == 'aarch64'
143 def running_extended_tests():
144 s = os.getenv("EXTENDED_TESTS", "n")
145 return True if s.lower() in ("y", "yes", "1") else False
148 def running_on_centos():
149 os_id = os.getenv("OS_ID", "")
150 return True if "centos" in os_id.lower() else False
153 class KeepAliveReporter(object):
155 Singleton object which reports test start to parent process
160 self.__dict__ = self._shared_state
168 def pipe(self, pipe):
169 if self._pipe is not None:
170 raise Exception("Internal error - pipe should only be set once.")
173 def send_keep_alive(self, test, desc=None):
175 Write current test tmpdir & desc to keep-alive pipe to signal liveness
177 if self.pipe is None:
178 # if not running forked..
182 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
186 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
189 class VppTestCase(unittest.TestCase):
190 """This subclass is a base class for VPP test cases that are implemented as
191 classes. It provides methods to create and run test case.
194 extra_vpp_punt_config = []
195 extra_vpp_plugin_config = []
198 def packet_infos(self):
199 """List of packet infos"""
200 return self._packet_infos
203 def get_packet_count_for_if_idx(cls, dst_if_index):
204 """Get the number of packet info for specified destination if index"""
205 if dst_if_index in cls._packet_count_for_dst_if_idx:
206 return cls._packet_count_for_dst_if_idx[dst_if_index]
212 """Return the instance of this testcase"""
213 return cls.test_instance
216 def set_debug_flags(cls, d):
217 cls.debug_core = False
218 cls.debug_gdb = False
219 cls.debug_gdbserver = False
224 cls.debug_core = True
227 elif dl == "gdbserver":
228 cls.debug_gdbserver = True
230 raise Exception("Unrecognized DEBUG option: '%s'" % d)
233 def get_least_used_cpu():
234 cpu_usage_list = [set(range(psutil.cpu_count()))]
235 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
236 if 'vpp_main' == p.info['name']]
237 for vpp_process in vpp_processes:
238 for cpu_usage_set in cpu_usage_list:
240 cpu_num = vpp_process.cpu_num()
241 if cpu_num in cpu_usage_set:
242 cpu_usage_set_index = cpu_usage_list.index(
244 if cpu_usage_set_index == len(cpu_usage_list) - 1:
245 cpu_usage_list.append({cpu_num})
247 cpu_usage_list[cpu_usage_set_index + 1].add(
249 cpu_usage_set.remove(cpu_num)
251 except psutil.NoSuchProcess:
254 for cpu_usage_set in cpu_usage_list:
255 if len(cpu_usage_set) > 0:
256 min_usage_set = cpu_usage_set
259 return random.choice(tuple(min_usage_set))
262 def print_header(cls):
263 if not hasattr(cls, '_header_printed'):
264 print(double_line_delim)
265 print(colorize(getdoc(cls).splitlines()[0], GREEN))
266 print(double_line_delim)
267 cls._header_printed = True
270 def setUpConstants(cls):
271 """ Set-up the test case class based on environment variables """
272 s = os.getenv("STEP", "n")
273 cls.step = True if s.lower() in ("y", "yes", "1") else False
274 d = os.getenv("DEBUG", None)
275 c = os.getenv("CACHE_OUTPUT", "1")
276 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
277 cls.set_debug_flags(d)
278 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
279 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
280 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
282 if cls.plugin_path is not None:
283 if cls.extern_plugin_path is not None:
284 plugin_path = "%s:%s" % (
285 cls.plugin_path, cls.extern_plugin_path)
287 plugin_path = cls.plugin_path
288 elif cls.extern_plugin_path is not None:
289 plugin_path = cls.extern_plugin_path
291 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
292 debug_cli = "cli-listen localhost:5002"
294 size = os.getenv("COREDUMP_SIZE")
296 coredump_size = "coredump-size %s" % size
297 if coredump_size is None:
298 coredump_size = "coredump-size unlimited"
300 cpu_core_number = cls.get_least_used_cpu()
302 cls.vpp_cmdline = [cls.vpp_bin, "unix",
303 "{", "nodaemon", debug_cli, "full-coredump",
304 coredump_size, "runtime-dir", cls.tempdir, "}",
305 "api-trace", "{", "on", "}", "api-segment", "{",
306 "prefix", cls.shm_prefix, "}", "cpu", "{",
307 "main-core", str(cpu_core_number), "}", "statseg",
308 "{", "socket-name", cls.stats_sock, "}", "plugins",
309 "{", "plugin", "dpdk_plugin.so", "{", "disable",
310 "}", "plugin", "unittest_plugin.so", "{", "enable",
311 "}"] + cls.extra_vpp_plugin_config + ["}", ]
312 if cls.extra_vpp_punt_config is not None:
313 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
314 if plugin_path is not None:
315 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
316 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
317 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
320 def wait_for_enter(cls):
321 if cls.debug_gdbserver:
322 print(double_line_delim)
323 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
325 print(double_line_delim)
326 print("Spawned VPP with PID: %d" % cls.vpp.pid)
328 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
330 print(single_line_delim)
331 print("You can debug the VPP using e.g.:")
332 if cls.debug_gdbserver:
333 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
334 print("Now is the time to attach a gdb by running the above "
335 "command, set up breakpoints etc. and then resume VPP from "
336 "within gdb by issuing the 'continue' command")
338 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
339 print("Now is the time to attach a gdb by running the above "
340 "command and set up breakpoints etc.")
341 print(single_line_delim)
342 raw_input("Press ENTER to continue running the testcase...")
346 cmdline = cls.vpp_cmdline
348 if cls.debug_gdbserver:
349 gdbserver = '/usr/bin/gdbserver'
350 if not os.path.isfile(gdbserver) or \
351 not os.access(gdbserver, os.X_OK):
352 raise Exception("gdbserver binary '%s' does not exist or is "
353 "not executable" % gdbserver)
355 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
356 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
359 cls.vpp = subprocess.Popen(cmdline,
360 stdout=subprocess.PIPE,
361 stderr=subprocess.PIPE,
363 except subprocess.CalledProcessError as e:
364 cls.logger.critical("Couldn't start vpp: %s" % e)
370 def wait_for_stats_socket(cls):
371 deadline = time.time() + 3
373 while time.time() < deadline or \
374 cls.debug_gdb or cls.debug_gdbserver:
375 if os.path.exists(cls.stats_sock):
380 cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
385 Perform class setup before running the testcase
386 Remove shared memory files, start vpp and connect the vpp-api
388 gc.collect() # run garbage collection first
391 cls.logger = get_logger(cls.__name__)
392 if hasattr(cls, 'parallel_handler'):
393 cls.logger.addHandler(cls.parallel_handler)
394 cls.logger.propagate = False
395 cls.tempdir = tempfile.mkdtemp(
396 prefix='vpp-unittest-%s-' % cls.__name__)
397 cls.stats_sock = "%s/stats.sock" % cls.tempdir
398 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
399 cls.file_handler.setFormatter(
400 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
402 cls.file_handler.setLevel(DEBUG)
403 cls.logger.addHandler(cls.file_handler)
404 cls.shm_prefix = os.path.basename(cls.tempdir)
405 os.chdir(cls.tempdir)
406 cls.logger.info("Temporary dir is %s, shm prefix is %s",
407 cls.tempdir, cls.shm_prefix)
409 cls.reset_packet_infos()
411 cls._zombie_captures = []
414 cls.registry = VppObjectRegistry()
415 cls.vpp_startup_failed = False
416 cls.reporter = KeepAliveReporter()
417 # need to catch exceptions here because if we raise, then the cleanup
418 # doesn't get called and we might end with a zombie vpp
421 cls.reporter.send_keep_alive(cls, 'setUpClass')
422 VppTestResult.current_test_case_info = TestCaseInfo(
423 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
424 cls.vpp_stdout_deque = deque()
425 cls.vpp_stderr_deque = deque()
426 cls.pump_thread_stop_flag = Event()
427 cls.pump_thread_wakeup_pipe = os.pipe()
428 cls.pump_thread = Thread(target=pump_output, args=(cls,))
429 cls.pump_thread.daemon = True
430 cls.pump_thread.start()
431 if cls.debug_gdb or cls.debug_gdbserver:
435 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
441 cls.vapi.register_hook(hook)
442 cls.wait_for_stats_socket()
443 cls.statistics = VPPStats(socketname=cls.stats_sock)
447 cls.vpp_startup_failed = True
449 "VPP died shortly after startup, check the"
450 " output to standard error for possible cause")
456 cls.vapi.disconnect()
459 if cls.debug_gdbserver:
460 print(colorize("You're running VPP inside gdbserver but "
461 "VPP-API connection failed, did you forget "
462 "to 'continue' VPP from within gdb?", RED))
474 Disconnect vpp-api, kill vpp and cleanup shared memory files
476 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
478 if cls.vpp.returncode is None:
479 print(double_line_delim)
480 print("VPP or GDB server is still running")
481 print(single_line_delim)
482 raw_input("When done debugging, press ENTER to kill the "
483 "process and finish running the testcase...")
485 # first signal that we want to stop the pump thread, then wake it up
486 if hasattr(cls, 'pump_thread_stop_flag'):
487 cls.pump_thread_stop_flag.set()
488 if hasattr(cls, 'pump_thread_wakeup_pipe'):
489 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
490 if hasattr(cls, 'pump_thread'):
491 cls.logger.debug("Waiting for pump thread to stop")
492 cls.pump_thread.join()
493 if hasattr(cls, 'vpp_stderr_reader_thread'):
494 cls.logger.debug("Waiting for stdderr pump to stop")
495 cls.vpp_stderr_reader_thread.join()
497 if hasattr(cls, 'vpp'):
498 if hasattr(cls, 'vapi'):
499 cls.vapi.disconnect()
502 if cls.vpp.returncode is None:
503 cls.logger.debug("Sending TERM to vpp")
505 cls.logger.debug("Waiting for vpp to die")
506 cls.vpp.communicate()
509 if cls.vpp_startup_failed:
510 stdout_log = cls.logger.info
511 stderr_log = cls.logger.critical
513 stdout_log = cls.logger.info
514 stderr_log = cls.logger.info
516 if hasattr(cls, 'vpp_stdout_deque'):
517 stdout_log(single_line_delim)
518 stdout_log('VPP output to stdout while running %s:', cls.__name__)
519 stdout_log(single_line_delim)
520 vpp_output = "".join(cls.vpp_stdout_deque)
521 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
523 stdout_log('\n%s', vpp_output)
524 stdout_log(single_line_delim)
526 if hasattr(cls, 'vpp_stderr_deque'):
527 stderr_log(single_line_delim)
528 stderr_log('VPP output to stderr while running %s:', cls.__name__)
529 stderr_log(single_line_delim)
530 vpp_output = "".join(str(cls.vpp_stderr_deque))
531 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
533 stderr_log('\n%s', vpp_output)
534 stderr_log(single_line_delim)
537 def tearDownClass(cls):
538 """ Perform final cleanup after running all tests in this test-case """
539 cls.reporter.send_keep_alive(cls, 'tearDownClass')
541 cls.file_handler.close()
542 cls.reset_packet_infos()
544 debug_internal.on_tear_down_class(cls)
547 """ Show various debug prints after each test """
548 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
549 (self.__class__.__name__, self._testMethodName,
550 self._testMethodDoc))
551 if not self.vpp_dead:
552 self.logger.debug(self.vapi.cli("show trace"))
553 self.logger.info(self.vapi.ppcli("show interface"))
554 self.logger.info(self.vapi.ppcli("show hardware"))
555 self.logger.info(self.statistics.set_errors_str())
556 self.logger.info(self.vapi.ppcli("show run"))
557 self.logger.info(self.vapi.ppcli("show log"))
558 self.registry.remove_vpp_config(self.logger)
559 # Save/Dump VPP api trace log
560 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
561 tmp_api_trace = "/tmp/%s" % api_trace
562 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
563 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
564 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
566 os.rename(tmp_api_trace, vpp_api_trace_log)
567 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
570 self.registry.unregister_all(self.logger)
573 """ Clear trace before running each test"""
574 self.reporter.send_keep_alive(self)
575 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
576 (self.__class__.__name__, self._testMethodName,
577 self._testMethodDoc))
579 raise Exception("VPP is dead when setting up the test")
580 self.sleep(.1, "during setUp")
581 self.vpp_stdout_deque.append(
582 "--- test setUp() for %s.%s(%s) starts here ---\n" %
583 (self.__class__.__name__, self._testMethodName,
584 self._testMethodDoc))
585 self.vpp_stderr_deque.append(
586 "--- test setUp() for %s.%s(%s) starts here ---\n" %
587 (self.__class__.__name__, self._testMethodName,
588 self._testMethodDoc))
589 self.vapi.cli("clear trace")
590 # store the test instance inside the test class - so that objects
591 # holding the class can access instance methods (like assertEqual)
592 type(self).test_instance = self
595 def pg_enable_capture(cls, interfaces=None):
597 Enable capture on packet-generator interfaces
599 :param interfaces: iterable interface indexes (if None,
600 use self.pg_interfaces)
603 if interfaces is None:
604 interfaces = cls.pg_interfaces
609 def register_capture(cls, cap_name):
610 """ Register a capture in the testclass """
611 # add to the list of captures with current timestamp
612 cls._captures.append((time.time(), cap_name))
613 # filter out from zombies
614 cls._zombie_captures = [(stamp, name)
615 for (stamp, name) in cls._zombie_captures
620 """ Remove any zombie captures and enable the packet generator """
621 # how long before capture is allowed to be deleted - otherwise vpp
622 # crashes - 100ms seems enough (this shouldn't be needed at all)
625 for stamp, cap_name in cls._zombie_captures:
626 wait = stamp + capture_ttl - now
628 cls.sleep(wait, "before deleting capture %s" % cap_name)
630 cls.logger.debug("Removing zombie capture %s" % cap_name)
631 cls.vapi.cli('packet-generator delete %s' % cap_name)
633 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
634 cls.vapi.cli('packet-generator enable')
635 cls._zombie_captures = cls._captures
639 def create_pg_interfaces(cls, interfaces):
641 Create packet-generator interfaces.
643 :param interfaces: iterable indexes of the interfaces.
644 :returns: List of created interfaces.
649 intf = VppPGInterface(cls, i)
650 setattr(cls, intf.name, intf)
652 cls.pg_interfaces = result
656 def create_loopback_interfaces(cls, count):
658 Create loopback interfaces.
660 :param count: number of interfaces created.
661 :returns: List of created interfaces.
663 result = [VppLoInterface(cls) for i in range(count)]
665 setattr(cls, intf.name, intf)
666 cls.lo_interfaces = result
670 def extend_packet(packet, size, padding=' '):
672 Extend packet to given size by padding with spaces or custom padding
673 NOTE: Currently works only when Raw layer is present.
675 :param packet: packet
676 :param size: target size
677 :param padding: padding used to extend the payload
680 packet_len = len(packet) + 4
681 extend = size - packet_len
683 num = (extend / len(padding)) + 1
684 packet[Raw].load += (padding * num)[:extend]
687 def reset_packet_infos(cls):
688 """ Reset the list of packet info objects and packet counts to zero """
689 cls._packet_infos = {}
690 cls._packet_count_for_dst_if_idx = {}
693 def create_packet_info(cls, src_if, dst_if):
695 Create packet info object containing the source and destination indexes
696 and add it to the testcase's packet info list
698 :param VppInterface src_if: source interface
699 :param VppInterface dst_if: destination interface
701 :returns: _PacketInfo object
705 info.index = len(cls._packet_infos)
706 info.src = src_if.sw_if_index
707 info.dst = dst_if.sw_if_index
708 if isinstance(dst_if, VppSubInterface):
709 dst_idx = dst_if.parent.sw_if_index
711 dst_idx = dst_if.sw_if_index
712 if dst_idx in cls._packet_count_for_dst_if_idx:
713 cls._packet_count_for_dst_if_idx[dst_idx] += 1
715 cls._packet_count_for_dst_if_idx[dst_idx] = 1
716 cls._packet_infos[info.index] = info
720 def info_to_payload(info):
722 Convert _PacketInfo object to packet payload
724 :param info: _PacketInfo object
726 :returns: string containing serialized data from packet info
728 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
732 def payload_to_info(payload):
734 Convert packet payload to _PacketInfo object
736 :param payload: packet payload
738 :returns: _PacketInfo object containing de-serialized data from payload
741 numbers = payload.split()
743 info.index = int(numbers[0])
744 info.src = int(numbers[1])
745 info.dst = int(numbers[2])
746 info.ip = int(numbers[3])
747 info.proto = int(numbers[4])
750 def get_next_packet_info(self, info):
752 Iterate over the packet info list stored in the testcase
753 Start iteration with first element if info is None
754 Continue based on index in info if info is specified
756 :param info: info or None
757 :returns: next info in list or None if no more infos
762 next_index = info.index + 1
763 if next_index == len(self._packet_infos):
766 return self._packet_infos[next_index]
768 def get_next_packet_info_for_interface(self, src_index, info):
770 Search the packet info list for the next packet info with same source
773 :param src_index: source interface index to search for
774 :param info: packet info - where to start the search
775 :returns: packet info or None
779 info = self.get_next_packet_info(info)
782 if info.src == src_index:
785 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
787 Search the packet info list for the next packet info with same source
788 and destination interface indexes
790 :param src_index: source interface index to search for
791 :param dst_index: destination interface index to search for
792 :param info: packet info - where to start the search
793 :returns: packet info or None
797 info = self.get_next_packet_info_for_interface(src_index, info)
800 if info.dst == dst_index:
803 def assert_equal(self, real_value, expected_value, name_or_class=None):
804 if name_or_class is None:
805 self.assertEqual(real_value, expected_value)
808 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
809 msg = msg % (getdoc(name_or_class).strip(),
810 real_value, str(name_or_class(real_value)),
811 expected_value, str(name_or_class(expected_value)))
813 msg = "Invalid %s: %s does not match expected value %s" % (
814 name_or_class, real_value, expected_value)
816 self.assertEqual(real_value, expected_value, msg)
818 def assert_in_range(self,
826 msg = "Invalid %s: %s out of range <%s,%s>" % (
827 name, real_value, expected_min, expected_max)
828 self.assertTrue(expected_min <= real_value <= expected_max, msg)
830 def assert_packet_checksums_valid(self, packet,
831 ignore_zero_udp_checksums=True):
832 received = packet.__class__(str(packet))
834 ppp("Verifying packet checksums for packet:", received))
835 udp_layers = ['UDP', 'UDPerror']
836 checksum_fields = ['cksum', 'chksum']
839 temp = received.__class__(str(received))
841 layer = temp.getlayer(counter)
843 for cf in checksum_fields:
844 if hasattr(layer, cf):
845 if ignore_zero_udp_checksums and \
846 0 == getattr(layer, cf) and \
847 layer.name in udp_layers:
850 checksums.append((counter, cf))
853 counter = counter + 1
854 if 0 == len(checksums):
856 temp = temp.__class__(str(temp))
857 for layer, cf in checksums:
858 calc_sum = getattr(temp[layer], cf)
860 getattr(received[layer], cf), calc_sum,
861 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
863 "Checksum field `%s` on `%s` layer has correct value `%s`" %
864 (cf, temp[layer].name, calc_sum))
866 def assert_checksum_valid(self, received_packet, layer,
868 ignore_zero_checksum=False):
869 """ Check checksum of received packet on given layer """
870 received_packet_checksum = getattr(received_packet[layer], field_name)
871 if ignore_zero_checksum and 0 == received_packet_checksum:
873 recalculated = received_packet.__class__(str(received_packet))
874 delattr(recalculated[layer], field_name)
875 recalculated = recalculated.__class__(str(recalculated))
876 self.assert_equal(received_packet_checksum,
877 getattr(recalculated[layer], field_name),
878 "packet checksum on layer: %s" % layer)
880 def assert_ip_checksum_valid(self, received_packet,
881 ignore_zero_checksum=False):
882 self.assert_checksum_valid(received_packet, 'IP',
883 ignore_zero_checksum=ignore_zero_checksum)
885 def assert_tcp_checksum_valid(self, received_packet,
886 ignore_zero_checksum=False):
887 self.assert_checksum_valid(received_packet, 'TCP',
888 ignore_zero_checksum=ignore_zero_checksum)
890 def assert_udp_checksum_valid(self, received_packet,
891 ignore_zero_checksum=True):
892 self.assert_checksum_valid(received_packet, 'UDP',
893 ignore_zero_checksum=ignore_zero_checksum)
895 def assert_embedded_icmp_checksum_valid(self, received_packet):
896 if received_packet.haslayer(IPerror):
897 self.assert_checksum_valid(received_packet, 'IPerror')
898 if received_packet.haslayer(TCPerror):
899 self.assert_checksum_valid(received_packet, 'TCPerror')
900 if received_packet.haslayer(UDPerror):
901 self.assert_checksum_valid(received_packet, 'UDPerror',
902 ignore_zero_checksum=True)
903 if received_packet.haslayer(ICMPerror):
904 self.assert_checksum_valid(received_packet, 'ICMPerror')
906 def assert_icmp_checksum_valid(self, received_packet):
907 self.assert_checksum_valid(received_packet, 'ICMP')
908 self.assert_embedded_icmp_checksum_valid(received_packet)
910 def assert_icmpv6_checksum_valid(self, pkt):
911 if pkt.haslayer(ICMPv6DestUnreach):
912 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
913 self.assert_embedded_icmp_checksum_valid(pkt)
914 if pkt.haslayer(ICMPv6EchoRequest):
915 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
916 if pkt.haslayer(ICMPv6EchoReply):
917 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
919 def assert_packet_counter_equal(self, counter, expected_value):
920 if counter.startswith("/"):
921 counter_value = self.statistics.get_counter(counter)
922 self.assert_equal(counter_value, expected_value,
923 "packet counter `%s'" % counter)
925 counters = self.vapi.cli("sh errors").split('\n')
927 for i in range(1, len(counters) - 1):
928 results = counters[i].split()
929 if results[1] == counter:
930 counter_value = int(results[0])
934 def sleep(cls, timeout, remark=None):
935 if hasattr(cls, 'logger'):
936 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
940 if hasattr(cls, 'logger') and after - before > 2 * timeout:
941 cls.logger.error("unexpected time.sleep() result - "
942 "slept for %es instead of ~%es!",
943 after - before, timeout)
944 if hasattr(cls, 'logger'):
946 "Finished sleep (%s) - slept %es (wanted %es)",
947 remark, after - before, timeout)
949 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
950 self.vapi.cli("clear trace")
951 intf.add_stream(pkts)
952 self.pg_enable_capture(self.pg_interfaces)
956 for i in self.pg_interfaces:
957 i.get_capture(0, timeout=timeout)
958 i.assert_nothing_captured(remark=remark)
961 def send_and_expect(self, input, pkts, output):
962 self.vapi.cli("clear trace")
963 input.add_stream(pkts)
964 self.pg_enable_capture(self.pg_interfaces)
966 if isinstance(object, (list,)):
969 rx.append(output.get_capture(len(pkts)))
971 rx = output.get_capture(len(pkts))
974 def send_and_expect_only(self, input, pkts, output, timeout=None):
975 self.vapi.cli("clear trace")
976 input.add_stream(pkts)
977 self.pg_enable_capture(self.pg_interfaces)
979 if isinstance(object, (list,)):
983 rx.append(output.get_capture(len(pkts)))
985 rx = output.get_capture(len(pkts))
989 for i in self.pg_interfaces:
991 i.get_capture(0, timeout=timeout)
992 i.assert_nothing_captured()
998 def get_testcase_doc_name(test):
999 return getdoc(test.__class__).splitlines()[0]
1002 def get_test_description(descriptions, test):
1003 short_description = test.shortDescription()
1004 if descriptions and short_description:
1005 return short_description
1010 class TestCaseInfo(object):
1011 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1012 self.logger = logger
1013 self.tempdir = tempdir
1014 self.vpp_pid = vpp_pid
1015 self.vpp_bin_path = vpp_bin_path
1016 self.core_crash_test = None
1019 class VppTestResult(unittest.TestResult):
1021 @property result_string
1022 String variable to store the test case result string.
1024 List variable containing 2-tuples of TestCase instances and strings
1025 holding formatted tracebacks. Each tuple represents a test which
1026 raised an unexpected exception.
1028 List variable containing 2-tuples of TestCase instances and strings
1029 holding formatted tracebacks. Each tuple represents a test where
1030 a failure was explicitly signalled using the TestCase.assert*()
1034 failed_test_cases_info = set()
1035 core_crash_test_cases_info = set()
1036 current_test_case_info = None
1038 def __init__(self, stream, descriptions, verbosity, runner):
1040 :param stream File descriptor to store where to report test results.
1041 Set to the standard error stream by default.
1042 :param descriptions Boolean variable to store information if to use
1043 test case descriptions.
1044 :param verbosity Integer variable to store required verbosity level.
1046 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
1047 self.stream = stream
1048 self.descriptions = descriptions
1049 self.verbosity = verbosity
1050 self.result_string = None
1051 self.runner = runner
1053 def addSuccess(self, test):
1055 Record a test succeeded result
1060 if self.current_test_case_info:
1061 self.current_test_case_info.logger.debug(
1062 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1063 test._testMethodName,
1064 test._testMethodDoc))
1065 unittest.TestResult.addSuccess(self, test)
1066 self.result_string = colorize("OK", GREEN)
1068 self.send_result_through_pipe(test, PASS)
1070 def addSkip(self, test, reason):
1072 Record a test skipped.
1078 if self.current_test_case_info:
1079 self.current_test_case_info.logger.debug(
1080 "--- addSkip() %s.%s(%s) called, reason is %s" %
1081 (test.__class__.__name__, test._testMethodName,
1082 test._testMethodDoc, reason))
1083 unittest.TestResult.addSkip(self, test, reason)
1084 self.result_string = colorize("SKIP", YELLOW)
1086 self.send_result_through_pipe(test, SKIP)
1088 def symlink_failed(self):
1089 if self.current_test_case_info:
1091 failed_dir = os.getenv('FAILED_DIR')
1092 link_path = os.path.join(
1095 os.path.basename(self.current_test_case_info.tempdir))
1096 if self.current_test_case_info.logger:
1097 self.current_test_case_info.logger.debug(
1098 "creating a link to the failed test")
1099 self.current_test_case_info.logger.debug(
1100 "os.symlink(%s, %s)" %
1101 (self.current_test_case_info.tempdir, link_path))
1102 if os.path.exists(link_path):
1103 if self.current_test_case_info.logger:
1104 self.current_test_case_info.logger.debug(
1105 'symlink already exists')
1107 os.symlink(self.current_test_case_info.tempdir, link_path)
1109 except Exception as e:
1110 if self.current_test_case_info.logger:
1111 self.current_test_case_info.logger.error(e)
1113 def send_result_through_pipe(self, test, result):
1114 if hasattr(self, 'test_framework_result_pipe'):
1115 pipe = self.test_framework_result_pipe
1117 pipe.send((test.id(), result))
1119 def log_error(self, test, err, fn_name):
1120 if self.current_test_case_info:
1121 if isinstance(test, unittest.suite._ErrorHolder):
1122 test_name = test.description
1124 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1125 test._testMethodName,
1126 test._testMethodDoc)
1127 self.current_test_case_info.logger.debug(
1128 "--- %s() %s called, err is %s" %
1129 (fn_name, test_name, err))
1130 self.current_test_case_info.logger.debug(
1131 "formatted exception is:\n%s" %
1132 "".join(format_exception(*err)))
1134 def add_error(self, test, err, unittest_fn, error_type):
1135 if error_type == FAIL:
1136 self.log_error(test, err, 'addFailure')
1137 error_type_str = colorize("FAIL", RED)
1138 elif error_type == ERROR:
1139 self.log_error(test, err, 'addError')
1140 error_type_str = colorize("ERROR", RED)
1142 raise Exception('Error type %s cannot be used to record an '
1143 'error or a failure' % error_type)
1145 unittest_fn(self, test, err)
1146 if self.current_test_case_info:
1147 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1149 self.current_test_case_info.tempdir)
1150 self.symlink_failed()
1151 self.failed_test_cases_info.add(self.current_test_case_info)
1152 if is_core_present(self.current_test_case_info.tempdir):
1153 if not self.current_test_case_info.core_crash_test:
1154 if isinstance(test, unittest.suite._ErrorHolder):
1155 test_name = str(test)
1157 test_name = "'{}' ({})".format(
1158 get_testcase_doc_name(test), test.id())
1159 self.current_test_case_info.core_crash_test = test_name
1160 self.core_crash_test_cases_info.add(
1161 self.current_test_case_info)
1163 self.result_string = '%s [no temp dir]' % error_type_str
1165 self.send_result_through_pipe(test, error_type)
1167 def addFailure(self, test, err):
1169 Record a test failed result
1172 :param err: error message
1175 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1177 def addError(self, test, err):
1179 Record a test error result
1182 :param err: error message
1185 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1187 def getDescription(self, test):
1189 Get test description
1192 :returns: test description
1195 return get_test_description(self.descriptions, test)
1197 def startTest(self, test):
1206 unittest.TestResult.startTest(self, test)
1207 if self.verbosity > 0:
1208 self.stream.writeln(
1209 "Starting " + self.getDescription(test) + " ...")
1210 self.stream.writeln(single_line_delim)
1212 def stopTest(self, test):
1214 Called when the given test has been run
1219 unittest.TestResult.stopTest(self, test)
1220 if self.verbosity > 0:
1221 self.stream.writeln(single_line_delim)
1222 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1223 self.result_string))
1224 self.stream.writeln(single_line_delim)
1226 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1227 self.result_string))
1229 self.send_result_through_pipe(test, TEST_RUN)
1231 def printErrors(self):
1233 Print errors from running the test case
1235 if len(self.errors) > 0 or len(self.failures) > 0:
1236 self.stream.writeln()
1237 self.printErrorList('ERROR', self.errors)
1238 self.printErrorList('FAIL', self.failures)
1240 # ^^ that is the last output from unittest before summary
1241 if not self.runner.print_summary:
1242 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1243 self.stream = devnull
1244 self.runner.stream = devnull
1246 def printErrorList(self, flavour, errors):
1248 Print error list to the output stream together with error type
1249 and test case description.
1251 :param flavour: error type
1252 :param errors: iterable errors
1255 for test, err in errors:
1256 self.stream.writeln(double_line_delim)
1257 self.stream.writeln("%s: %s" %
1258 (flavour, self.getDescription(test)))
1259 self.stream.writeln(single_line_delim)
1260 self.stream.writeln("%s" % err)
1263 class VppTestRunner(unittest.TextTestRunner):
1265 A basic test runner implementation which prints results to standard error.
1269 def resultclass(self):
1270 """Class maintaining the results of the tests"""
1271 return VppTestResult
1273 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1274 result_pipe=None, failfast=False, buffer=False,
1275 resultclass=None, print_summary=True):
1276 # ignore stream setting here, use hard-coded stdout to be in sync
1277 # with prints from VppTestCase methods ...
1278 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1279 verbosity, failfast, buffer,
1281 KeepAliveReporter.pipe = keep_alive_pipe
1283 self.orig_stream = self.stream
1284 self.resultclass.test_framework_result_pipe = result_pipe
1286 self.print_summary = print_summary
1288 def _makeResult(self):
1289 return self.resultclass(self.stream,
1294 def run(self, test):
1301 faulthandler.enable() # emit stack trace to stderr if killed by signal
1303 result = super(VppTestRunner, self).run(test)
1304 if not self.print_summary:
1305 self.stream = self.orig_stream
1306 result.stream = self.orig_stream
1310 class Worker(Thread):
1311 def __init__(self, args, logger, env={}):
1312 self.logger = logger
1315 self.env = copy.deepcopy(env)
1316 super(Worker, self).__init__()
1319 executable = self.args[0]
1320 self.logger.debug("Running executable w/args `%s'" % self.args)
1321 env = os.environ.copy()
1322 env.update(self.env)
1323 env["CK_LOG_FILE_NAME"] = "-"
1324 self.process = subprocess.Popen(
1325 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1326 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1327 out, err = self.process.communicate()
1328 self.logger.debug("Finished running `%s'" % executable)
1329 self.logger.info("Return code is `%s'" % self.process.returncode)
1330 self.logger.info(single_line_delim)
1331 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1332 self.logger.info(single_line_delim)
1333 self.logger.info(out)
1334 self.logger.info(single_line_delim)
1335 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1336 self.logger.info(single_line_delim)
1337 self.logger.info(err)
1338 self.logger.info(single_line_delim)
1339 self.result = self.process.returncode