3 from __future__ import print_function
16 from collections import deque
17 from threading import Thread, Event
18 from inspect import getdoc, isclass
19 from traceback import format_exception
20 from logging import FileHandler, DEBUG, Formatter
21 from scapy.packet import Raw
22 from hook import StepHook, PollHook, VppDiedError
23 from vpp_pg_interface import VppPGInterface
24 from vpp_sub_interface import VppSubInterface
25 from vpp_lo_interface import VppLoInterface
26 from vpp_papi_provider import VppPapiProvider
27 from vpp_papi.vpp_stats import VPPStats
28 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
30 from vpp_object import VppObjectRegistry
31 from util import ppp, is_core_present
32 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
33 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
34 from scapy.layers.inet6 import ICMPv6EchoReply
36 if os.name == 'posix' and sys.version_info[0] < 3:
37 # using subprocess32 is recommended by python official documentation
38 # @ https://docs.python.org/2/library/subprocess.html
39 import subprocess32 as subprocess
43 # Python2/3 compatible
55 debug_framework = False
56 if os.getenv('TEST_DEBUG', "0") == "1":
57 debug_framework = True
61 Test framework module.
63 The module provides a set of tools for constructing and running tests and
64 representing the results.
68 class _PacketInfo(object):
69 """Private class to create packet info object.
71 Help process information about the next packet.
72 Set variables to default values.
74 #: Store the index of the packet.
76 #: Store the index of the source packet generator interface of the packet.
78 #: Store the index of the destination packet generator interface
81 #: Store expected ip version
83 #: Store expected upper protocol
85 #: Store the copy of the former packet.
88 def __eq__(self, other):
89 index = self.index == other.index
90 src = self.src == other.src
91 dst = self.dst == other.dst
92 data = self.data == other.data
93 return index and src and dst and data
96 def pump_output(testclass):
97 """ pump output from vpp stdout/stderr to proper queues """
100 while not testclass.pump_thread_stop_flag.is_set():
101 readable = select.select([testclass.vpp.stdout.fileno(),
102 testclass.vpp.stderr.fileno(),
103 testclass.pump_thread_wakeup_pipe[0]],
105 if testclass.vpp.stdout.fileno() in readable:
106 read = os.read(testclass.vpp.stdout.fileno(), 102400)
108 split = read.splitlines(True)
109 if len(stdout_fragment) > 0:
110 split[0] = "%s%s" % (stdout_fragment, split[0])
111 if len(split) > 0 and split[-1].endswith("\n"):
115 stdout_fragment = split[-1]
116 testclass.vpp_stdout_deque.extend(split[:limit])
117 if not testclass.cache_vpp_output:
118 for line in split[:limit]:
119 testclass.logger.debug(
120 "VPP STDOUT: %s" % line.rstrip("\n"))
121 if testclass.vpp.stderr.fileno() in readable:
122 read = os.read(testclass.vpp.stderr.fileno(), 102400)
124 split = read.splitlines(True)
125 if len(stderr_fragment) > 0:
126 split[0] = "%s%s" % (stderr_fragment, split[0])
127 if len(split) > 0 and split[-1].endswith(b"\n"):
131 stderr_fragment = split[-1]
132 testclass.vpp_stderr_deque.extend(split[:limit])
133 if not testclass.cache_vpp_output:
134 for line in split[:limit]:
135 testclass.logger.debug(
136 "VPP STDERR: %s" % line.rstrip("\n"))
137 # ignoring the dummy pipe here intentionally - the
138 # flag will take care of properly terminating the loop
141 def _is_skip_aarch64_set():
142 return os.getenv('SKIP_AARCH64', 'n').lower() in ('yes', 'y', '1')
144 is_skip_aarch64_set = _is_skip_aarch64_set()
147 def _is_platform_aarch64():
148 return platform.machine() == 'aarch64'
150 is_platform_aarch64 = _is_platform_aarch64()
153 def _running_extended_tests():
154 s = os.getenv("EXTENDED_TESTS", "n")
155 return True if s.lower() in ("y", "yes", "1") else False
157 running_extended_tests = _running_extended_tests()
160 def _running_on_centos():
161 os_id = os.getenv("OS_ID", "")
162 return True if "centos" in os_id.lower() else False
164 running_on_centos = _running_on_centos
167 class KeepAliveReporter(object):
169 Singleton object which reports test start to parent process
174 self.__dict__ = self._shared_state
182 def pipe(self, pipe):
183 if self._pipe is not None:
184 raise Exception("Internal error - pipe should only be set once.")
187 def send_keep_alive(self, test, desc=None):
189 Write current test tmpdir & desc to keep-alive pipe to signal liveness
191 if self.pipe is None:
192 # if not running forked..
196 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
200 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
203 class VppTestCase(unittest.TestCase):
204 """This subclass is a base class for VPP test cases that are implemented as
205 classes. It provides methods to create and run test case.
208 extra_vpp_punt_config = []
209 extra_vpp_plugin_config = []
212 def packet_infos(self):
213 """List of packet infos"""
214 return self._packet_infos
217 def get_packet_count_for_if_idx(cls, dst_if_index):
218 """Get the number of packet info for specified destination if index"""
219 if dst_if_index in cls._packet_count_for_dst_if_idx:
220 return cls._packet_count_for_dst_if_idx[dst_if_index]
226 """Return the instance of this testcase"""
227 return cls.test_instance
230 def set_debug_flags(cls, d):
231 cls.debug_core = False
232 cls.debug_gdb = False
233 cls.debug_gdbserver = False
238 cls.debug_core = True
241 elif dl == "gdbserver":
242 cls.debug_gdbserver = True
244 raise Exception("Unrecognized DEBUG option: '%s'" % d)
247 def get_least_used_cpu():
248 cpu_usage_list = [set(range(psutil.cpu_count()))]
249 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
250 if 'vpp_main' == p.info['name']]
251 for vpp_process in vpp_processes:
252 for cpu_usage_set in cpu_usage_list:
254 cpu_num = vpp_process.cpu_num()
255 if cpu_num in cpu_usage_set:
256 cpu_usage_set_index = cpu_usage_list.index(
258 if cpu_usage_set_index == len(cpu_usage_list) - 1:
259 cpu_usage_list.append({cpu_num})
261 cpu_usage_list[cpu_usage_set_index + 1].add(
263 cpu_usage_set.remove(cpu_num)
265 except psutil.NoSuchProcess:
268 for cpu_usage_set in cpu_usage_list:
269 if len(cpu_usage_set) > 0:
270 min_usage_set = cpu_usage_set
273 return random.choice(tuple(min_usage_set))
276 def setUpConstants(cls):
277 """ Set-up the test case class based on environment variables """
278 s = os.getenv("STEP", "n")
279 cls.step = True if s.lower() in ("y", "yes", "1") else False
280 d = os.getenv("DEBUG", None)
281 c = os.getenv("CACHE_OUTPUT", "1")
282 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
283 cls.set_debug_flags(d)
284 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
285 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
286 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
288 if cls.plugin_path is not None:
289 if cls.extern_plugin_path is not None:
290 plugin_path = "%s:%s" % (
291 cls.plugin_path, cls.extern_plugin_path)
293 plugin_path = cls.plugin_path
294 elif cls.extern_plugin_path is not None:
295 plugin_path = cls.extern_plugin_path
297 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
298 debug_cli = "cli-listen localhost:5002"
300 size = os.getenv("COREDUMP_SIZE")
302 coredump_size = "coredump-size %s" % size
303 if coredump_size is None:
304 coredump_size = "coredump-size unlimited"
306 cpu_core_number = cls.get_least_used_cpu()
308 cls.vpp_cmdline = [cls.vpp_bin, "unix",
309 "{", "nodaemon", debug_cli, "full-coredump",
310 coredump_size, "runtime-dir", cls.tempdir, "}",
311 "api-trace", "{", "on", "}", "api-segment", "{",
312 "prefix", cls.shm_prefix, "}", "cpu", "{",
313 "main-core", str(cpu_core_number), "}", "statseg",
314 "{", "socket-name", cls.stats_sock, "}", "plugins",
315 "{", "plugin", "dpdk_plugin.so", "{", "disable",
316 "}", "plugin", "unittest_plugin.so", "{", "enable",
317 "}"] + cls.extra_vpp_plugin_config + ["}", ]
318 if cls.extra_vpp_punt_config is not None:
319 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
320 if plugin_path is not None:
321 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
322 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
323 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
326 def wait_for_enter(cls):
327 if cls.debug_gdbserver:
328 print(double_line_delim)
329 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
331 print(double_line_delim)
332 print("Spawned VPP with PID: %d" % cls.vpp.pid)
334 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
336 print(single_line_delim)
337 print("You can debug the VPP using e.g.:")
338 if cls.debug_gdbserver:
339 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
340 print("Now is the time to attach a gdb by running the above "
341 "command, set up breakpoints etc. and then resume VPP from "
342 "within gdb by issuing the 'continue' command")
344 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
345 print("Now is the time to attach a gdb by running the above "
346 "command and set up breakpoints etc.")
347 print(single_line_delim)
348 input("Press ENTER to continue running the testcase...")
352 cmdline = cls.vpp_cmdline
354 if cls.debug_gdbserver:
355 gdbserver = '/usr/bin/gdbserver'
356 if not os.path.isfile(gdbserver) or \
357 not os.access(gdbserver, os.X_OK):
358 raise Exception("gdbserver binary '%s' does not exist or is "
359 "not executable" % gdbserver)
361 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
362 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
365 cls.vpp = subprocess.Popen(cmdline,
366 stdout=subprocess.PIPE,
367 stderr=subprocess.PIPE,
369 except subprocess.CalledProcessError as e:
370 cls.logger.critical("Couldn't start vpp: %s" % e)
376 def wait_for_stats_socket(cls):
377 deadline = time.time() + 3
379 while time.time() < deadline or \
380 cls.debug_gdb or cls.debug_gdbserver:
381 if os.path.exists(cls.stats_sock):
386 cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
391 Perform class setup before running the testcase
392 Remove shared memory files, start vpp and connect the vpp-api
394 gc.collect() # run garbage collection first
396 cls.logger = get_logger(cls.__name__)
397 if hasattr(cls, 'parallel_handler'):
398 cls.logger.addHandler(cls.parallel_handler)
399 cls.logger.propagate = False
400 cls.tempdir = tempfile.mkdtemp(
401 prefix='vpp-unittest-%s-' % cls.__name__)
402 cls.stats_sock = "%s/stats.sock" % cls.tempdir
403 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
404 cls.file_handler.setFormatter(
405 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
407 cls.file_handler.setLevel(DEBUG)
408 cls.logger.addHandler(cls.file_handler)
409 cls.shm_prefix = os.path.basename(cls.tempdir)
410 os.chdir(cls.tempdir)
411 cls.logger.info("Temporary dir is %s, shm prefix is %s",
412 cls.tempdir, cls.shm_prefix)
414 cls.reset_packet_infos()
416 cls._zombie_captures = []
419 cls.registry = VppObjectRegistry()
420 cls.vpp_startup_failed = False
421 cls.reporter = KeepAliveReporter()
422 # need to catch exceptions here because if we raise, then the cleanup
423 # doesn't get called and we might end with a zombie vpp
426 cls.reporter.send_keep_alive(cls, 'setUpClass')
427 VppTestResult.current_test_case_info = TestCaseInfo(
428 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
429 cls.vpp_stdout_deque = deque()
430 cls.vpp_stderr_deque = deque()
431 cls.pump_thread_stop_flag = Event()
432 cls.pump_thread_wakeup_pipe = os.pipe()
433 cls.pump_thread = Thread(target=pump_output, args=(cls,))
434 cls.pump_thread.daemon = True
435 cls.pump_thread.start()
436 if cls.debug_gdb or cls.debug_gdbserver:
440 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
446 cls.vapi.register_hook(hook)
447 cls.wait_for_stats_socket()
448 cls.statistics = VPPStats(socketname=cls.stats_sock)
452 cls.vpp_startup_failed = True
454 "VPP died shortly after startup, check the"
455 " output to standard error for possible cause")
461 cls.vapi.disconnect()
464 if cls.debug_gdbserver:
465 print(colorize("You're running VPP inside gdbserver but "
466 "VPP-API connection failed, did you forget "
467 "to 'continue' VPP from within gdb?", RED))
479 Disconnect vpp-api, kill vpp and cleanup shared memory files
481 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
483 if cls.vpp.returncode is None:
484 print(double_line_delim)
485 print("VPP or GDB server is still running")
486 print(single_line_delim)
487 input("When done debugging, press ENTER to kill the "
488 "process and finish running the testcase...")
490 # first signal that we want to stop the pump thread, then wake it up
491 if hasattr(cls, 'pump_thread_stop_flag'):
492 cls.pump_thread_stop_flag.set()
493 if hasattr(cls, 'pump_thread_wakeup_pipe'):
494 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
495 if hasattr(cls, 'pump_thread'):
496 cls.logger.debug("Waiting for pump thread to stop")
497 cls.pump_thread.join()
498 if hasattr(cls, 'vpp_stderr_reader_thread'):
499 cls.logger.debug("Waiting for stdderr pump to stop")
500 cls.vpp_stderr_reader_thread.join()
502 if hasattr(cls, 'vpp'):
503 if hasattr(cls, 'vapi'):
504 cls.vapi.disconnect()
507 if cls.vpp.returncode is None:
508 cls.logger.debug("Sending TERM to vpp")
510 cls.logger.debug("Waiting for vpp to die")
511 cls.vpp.communicate()
514 if cls.vpp_startup_failed:
515 stdout_log = cls.logger.info
516 stderr_log = cls.logger.critical
518 stdout_log = cls.logger.info
519 stderr_log = cls.logger.info
521 if hasattr(cls, 'vpp_stdout_deque'):
522 stdout_log(single_line_delim)
523 stdout_log('VPP output to stdout while running %s:', cls.__name__)
524 stdout_log(single_line_delim)
525 vpp_output = "".join(cls.vpp_stdout_deque)
526 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
528 stdout_log('\n%s', vpp_output)
529 stdout_log(single_line_delim)
531 if hasattr(cls, 'vpp_stderr_deque'):
532 stderr_log(single_line_delim)
533 stderr_log('VPP output to stderr while running %s:', cls.__name__)
534 stderr_log(single_line_delim)
535 vpp_output = "".join(str(cls.vpp_stderr_deque))
536 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
538 stderr_log('\n%s', vpp_output)
539 stderr_log(single_line_delim)
542 def tearDownClass(cls):
543 """ Perform final cleanup after running all tests in this test-case """
544 cls.reporter.send_keep_alive(cls, 'tearDownClass')
546 cls.file_handler.close()
547 cls.reset_packet_infos()
549 debug_internal.on_tear_down_class(cls)
552 """ Show various debug prints after each test """
553 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
554 (self.__class__.__name__, self._testMethodName,
555 self._testMethodDoc))
556 if not self.vpp_dead:
557 self.logger.debug(self.vapi.cli("show trace max 1000"))
558 self.logger.info(self.vapi.ppcli("show interface"))
559 self.logger.info(self.vapi.ppcli("show hardware"))
560 self.logger.info(self.statistics.set_errors_str())
561 self.logger.info(self.vapi.ppcli("show run"))
562 self.logger.info(self.vapi.ppcli("show log"))
563 self.registry.remove_vpp_config(self.logger)
564 # Save/Dump VPP api trace log
565 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
566 tmp_api_trace = "/tmp/%s" % api_trace
567 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
568 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
569 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
571 os.rename(tmp_api_trace, vpp_api_trace_log)
572 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
575 self.registry.unregister_all(self.logger)
578 """ Clear trace before running each test"""
579 self.reporter.send_keep_alive(self)
580 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
581 (self.__class__.__name__, self._testMethodName,
582 self._testMethodDoc))
584 raise Exception("VPP is dead when setting up the test")
585 self.sleep(.1, "during setUp")
586 self.vpp_stdout_deque.append(
587 "--- test setUp() for %s.%s(%s) starts here ---\n" %
588 (self.__class__.__name__, self._testMethodName,
589 self._testMethodDoc))
590 self.vpp_stderr_deque.append(
591 "--- test setUp() for %s.%s(%s) starts here ---\n" %
592 (self.__class__.__name__, self._testMethodName,
593 self._testMethodDoc))
594 self.vapi.cli("clear trace")
595 # store the test instance inside the test class - so that objects
596 # holding the class can access instance methods (like assertEqual)
597 type(self).test_instance = self
600 def pg_enable_capture(cls, interfaces=None):
602 Enable capture on packet-generator interfaces
604 :param interfaces: iterable interface indexes (if None,
605 use self.pg_interfaces)
608 if interfaces is None:
609 interfaces = cls.pg_interfaces
614 def register_capture(cls, cap_name):
615 """ Register a capture in the testclass """
616 # add to the list of captures with current timestamp
617 cls._captures.append((time.time(), cap_name))
618 # filter out from zombies
619 cls._zombie_captures = [(stamp, name)
620 for (stamp, name) in cls._zombie_captures
625 """ Remove any zombie captures and enable the packet generator """
626 # how long before capture is allowed to be deleted - otherwise vpp
627 # crashes - 100ms seems enough (this shouldn't be needed at all)
630 for stamp, cap_name in cls._zombie_captures:
631 wait = stamp + capture_ttl - now
633 cls.sleep(wait, "before deleting capture %s" % cap_name)
635 cls.logger.debug("Removing zombie capture %s" % cap_name)
636 cls.vapi.cli('packet-generator delete %s' % cap_name)
638 cls.vapi.cli("trace add pg-input 1000")
639 cls.vapi.cli('packet-generator enable')
640 cls._zombie_captures = cls._captures
644 def create_pg_interfaces(cls, interfaces):
646 Create packet-generator interfaces.
648 :param interfaces: iterable indexes of the interfaces.
649 :returns: List of created interfaces.
654 intf = VppPGInterface(cls, i)
655 setattr(cls, intf.name, intf)
657 cls.pg_interfaces = result
661 def create_loopback_interfaces(cls, count):
663 Create loopback interfaces.
665 :param count: number of interfaces created.
666 :returns: List of created interfaces.
668 result = [VppLoInterface(cls) for i in range(count)]
670 setattr(cls, intf.name, intf)
671 cls.lo_interfaces = result
675 def extend_packet(packet, size, padding=' '):
677 Extend packet to given size by padding with spaces or custom padding
678 NOTE: Currently works only when Raw layer is present.
680 :param packet: packet
681 :param size: target size
682 :param padding: padding used to extend the payload
685 packet_len = len(packet) + 4
686 extend = size - packet_len
688 num = (extend / len(padding)) + 1
689 packet[Raw].load += (padding * num)[:extend]
692 def reset_packet_infos(cls):
693 """ Reset the list of packet info objects and packet counts to zero """
694 cls._packet_infos = {}
695 cls._packet_count_for_dst_if_idx = {}
698 def create_packet_info(cls, src_if, dst_if):
700 Create packet info object containing the source and destination indexes
701 and add it to the testcase's packet info list
703 :param VppInterface src_if: source interface
704 :param VppInterface dst_if: destination interface
706 :returns: _PacketInfo object
710 info.index = len(cls._packet_infos)
711 info.src = src_if.sw_if_index
712 info.dst = dst_if.sw_if_index
713 if isinstance(dst_if, VppSubInterface):
714 dst_idx = dst_if.parent.sw_if_index
716 dst_idx = dst_if.sw_if_index
717 if dst_idx in cls._packet_count_for_dst_if_idx:
718 cls._packet_count_for_dst_if_idx[dst_idx] += 1
720 cls._packet_count_for_dst_if_idx[dst_idx] = 1
721 cls._packet_infos[info.index] = info
725 def info_to_payload(info):
727 Convert _PacketInfo object to packet payload
729 :param info: _PacketInfo object
731 :returns: string containing serialized data from packet info
733 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
737 def payload_to_info(payload):
739 Convert packet payload to _PacketInfo object
741 :param payload: packet payload
743 :returns: _PacketInfo object containing de-serialized data from payload
746 numbers = payload.split()
748 info.index = int(numbers[0])
749 info.src = int(numbers[1])
750 info.dst = int(numbers[2])
751 info.ip = int(numbers[3])
752 info.proto = int(numbers[4])
755 def get_next_packet_info(self, info):
757 Iterate over the packet info list stored in the testcase
758 Start iteration with first element if info is None
759 Continue based on index in info if info is specified
761 :param info: info or None
762 :returns: next info in list or None if no more infos
767 next_index = info.index + 1
768 if next_index == len(self._packet_infos):
771 return self._packet_infos[next_index]
773 def get_next_packet_info_for_interface(self, src_index, info):
775 Search the packet info list for the next packet info with same source
778 :param src_index: source interface index to search for
779 :param info: packet info - where to start the search
780 :returns: packet info or None
784 info = self.get_next_packet_info(info)
787 if info.src == src_index:
790 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
792 Search the packet info list for the next packet info with same source
793 and destination interface indexes
795 :param src_index: source interface index to search for
796 :param dst_index: destination interface index to search for
797 :param info: packet info - where to start the search
798 :returns: packet info or None
802 info = self.get_next_packet_info_for_interface(src_index, info)
805 if info.dst == dst_index:
808 def assert_equal(self, real_value, expected_value, name_or_class=None):
809 if name_or_class is None:
810 self.assertEqual(real_value, expected_value)
813 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
814 msg = msg % (getdoc(name_or_class).strip(),
815 real_value, str(name_or_class(real_value)),
816 expected_value, str(name_or_class(expected_value)))
818 msg = "Invalid %s: %s does not match expected value %s" % (
819 name_or_class, real_value, expected_value)
821 self.assertEqual(real_value, expected_value, msg)
823 def assert_in_range(self,
831 msg = "Invalid %s: %s out of range <%s,%s>" % (
832 name, real_value, expected_min, expected_max)
833 self.assertTrue(expected_min <= real_value <= expected_max, msg)
835 def assert_packet_checksums_valid(self, packet,
836 ignore_zero_udp_checksums=True):
837 received = packet.__class__(str(packet))
839 ppp("Verifying packet checksums for packet:", received))
840 udp_layers = ['UDP', 'UDPerror']
841 checksum_fields = ['cksum', 'chksum']
844 temp = received.__class__(str(received))
846 layer = temp.getlayer(counter)
848 for cf in checksum_fields:
849 if hasattr(layer, cf):
850 if ignore_zero_udp_checksums and \
851 0 == getattr(layer, cf) and \
852 layer.name in udp_layers:
855 checksums.append((counter, cf))
858 counter = counter + 1
859 if 0 == len(checksums):
861 temp = temp.__class__(str(temp))
862 for layer, cf in checksums:
863 calc_sum = getattr(temp[layer], cf)
865 getattr(received[layer], cf), calc_sum,
866 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
868 "Checksum field `%s` on `%s` layer has correct value `%s`" %
869 (cf, temp[layer].name, calc_sum))
871 def assert_checksum_valid(self, received_packet, layer,
873 ignore_zero_checksum=False):
874 """ Check checksum of received packet on given layer """
875 received_packet_checksum = getattr(received_packet[layer], field_name)
876 if ignore_zero_checksum and 0 == received_packet_checksum:
878 recalculated = received_packet.__class__(str(received_packet))
879 delattr(recalculated[layer], field_name)
880 recalculated = recalculated.__class__(str(recalculated))
881 self.assert_equal(received_packet_checksum,
882 getattr(recalculated[layer], field_name),
883 "packet checksum on layer: %s" % layer)
885 def assert_ip_checksum_valid(self, received_packet,
886 ignore_zero_checksum=False):
887 self.assert_checksum_valid(received_packet, 'IP',
888 ignore_zero_checksum=ignore_zero_checksum)
890 def assert_tcp_checksum_valid(self, received_packet,
891 ignore_zero_checksum=False):
892 self.assert_checksum_valid(received_packet, 'TCP',
893 ignore_zero_checksum=ignore_zero_checksum)
895 def assert_udp_checksum_valid(self, received_packet,
896 ignore_zero_checksum=True):
897 self.assert_checksum_valid(received_packet, 'UDP',
898 ignore_zero_checksum=ignore_zero_checksum)
900 def assert_embedded_icmp_checksum_valid(self, received_packet):
901 if received_packet.haslayer(IPerror):
902 self.assert_checksum_valid(received_packet, 'IPerror')
903 if received_packet.haslayer(TCPerror):
904 self.assert_checksum_valid(received_packet, 'TCPerror')
905 if received_packet.haslayer(UDPerror):
906 self.assert_checksum_valid(received_packet, 'UDPerror',
907 ignore_zero_checksum=True)
908 if received_packet.haslayer(ICMPerror):
909 self.assert_checksum_valid(received_packet, 'ICMPerror')
911 def assert_icmp_checksum_valid(self, received_packet):
912 self.assert_checksum_valid(received_packet, 'ICMP')
913 self.assert_embedded_icmp_checksum_valid(received_packet)
915 def assert_icmpv6_checksum_valid(self, pkt):
916 if pkt.haslayer(ICMPv6DestUnreach):
917 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
918 self.assert_embedded_icmp_checksum_valid(pkt)
919 if pkt.haslayer(ICMPv6EchoRequest):
920 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
921 if pkt.haslayer(ICMPv6EchoReply):
922 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
924 def assert_packet_counter_equal(self, counter, expected_value):
925 if counter.startswith("/"):
926 counter_value = self.statistics.get_counter(counter)
927 self.assert_equal(counter_value, expected_value,
928 "packet counter `%s'" % counter)
930 counters = self.vapi.cli("sh errors").split('\n')
932 for i in range(1, len(counters) - 1):
933 results = counters[i].split()
934 if results[1] == counter:
935 counter_value = int(results[0])
939 def sleep(cls, timeout, remark=None):
940 if hasattr(cls, 'logger'):
941 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
945 if hasattr(cls, 'logger') and after - before > 2 * timeout:
946 cls.logger.error("unexpected time.sleep() result - "
947 "slept for %es instead of ~%es!",
948 after - before, timeout)
949 if hasattr(cls, 'logger'):
951 "Finished sleep (%s) - slept %es (wanted %es)",
952 remark, after - before, timeout)
954 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
955 self.vapi.cli("clear trace")
956 intf.add_stream(pkts)
957 self.pg_enable_capture(self.pg_interfaces)
961 for i in self.pg_interfaces:
962 i.get_capture(0, timeout=timeout)
963 i.assert_nothing_captured(remark=remark)
966 def send_and_expect(self, input, pkts, output):
967 self.vapi.cli("clear trace")
968 input.add_stream(pkts)
969 self.pg_enable_capture(self.pg_interfaces)
971 rx = output.get_capture(len(pkts))
974 def send_and_expect_only(self, input, pkts, output, timeout=None):
975 self.vapi.cli("clear trace")
976 input.add_stream(pkts)
977 self.pg_enable_capture(self.pg_interfaces)
979 rx = output.get_capture(len(pkts))
983 for i in self.pg_interfaces:
985 i.get_capture(0, timeout=timeout)
986 i.assert_nothing_captured()
992 """ unittest calls runTest when TestCase is instantiated without a
993 test case. Use case: Writing unittests against VppTestCase"""
997 def get_testcase_doc_name(test):
998 return getdoc(test.__class__).splitlines()[0]
1001 def get_test_description(descriptions, test):
1002 short_description = test.shortDescription()
1003 if descriptions and short_description:
1004 return short_description
1009 class TestCaseInfo(object):
1010 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1011 self.logger = logger
1012 self.tempdir = tempdir
1013 self.vpp_pid = vpp_pid
1014 self.vpp_bin_path = vpp_bin_path
1015 self.core_crash_test = None
1018 class VppTestResult(unittest.TestResult):
1020 @property result_string
1021 String variable to store the test case result string.
1023 List variable containing 2-tuples of TestCase instances and strings
1024 holding formatted tracebacks. Each tuple represents a test which
1025 raised an unexpected exception.
1027 List variable containing 2-tuples of TestCase instances and strings
1028 holding formatted tracebacks. Each tuple represents a test where
1029 a failure was explicitly signalled using the TestCase.assert*()
1033 failed_test_cases_info = set()
1034 core_crash_test_cases_info = set()
1035 current_test_case_info = None
1037 def __init__(self, stream=None, descriptions=None, verbosity=None,
1040 :param stream File descriptor to store where to report test results.
1041 Set to the standard error stream by default.
1042 :param descriptions Boolean variable to store information if to use
1043 test case descriptions.
1044 :param verbosity Integer variable to store required verbosity level.
1046 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1047 self.stream = stream
1048 self.descriptions = descriptions
1049 self.verbosity = verbosity
1050 self.result_string = None
1051 self.runner = runner
1053 def addSuccess(self, test):
1055 Record a test succeeded result
1060 if self.current_test_case_info:
1061 self.current_test_case_info.logger.debug(
1062 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1063 test._testMethodName,
1064 test._testMethodDoc))
1065 unittest.TestResult.addSuccess(self, test)
1066 self.result_string = colorize("OK", GREEN)
1068 self.send_result_through_pipe(test, PASS)
1070 def addSkip(self, test, reason):
1072 Record a test skipped.
1078 if self.current_test_case_info:
1079 self.current_test_case_info.logger.debug(
1080 "--- addSkip() %s.%s(%s) called, reason is %s" %
1081 (test.__class__.__name__, test._testMethodName,
1082 test._testMethodDoc, reason))
1083 unittest.TestResult.addSkip(self, test, reason)
1084 self.result_string = colorize("SKIP", YELLOW)
1086 self.send_result_through_pipe(test, SKIP)
1088 def symlink_failed(self):
1089 if self.current_test_case_info:
1091 failed_dir = os.getenv('FAILED_DIR')
1092 link_path = os.path.join(
1095 os.path.basename(self.current_test_case_info.tempdir))
1096 if self.current_test_case_info.logger:
1097 self.current_test_case_info.logger.debug(
1098 "creating a link to the failed test")
1099 self.current_test_case_info.logger.debug(
1100 "os.symlink(%s, %s)" %
1101 (self.current_test_case_info.tempdir, link_path))
1102 if os.path.exists(link_path):
1103 if self.current_test_case_info.logger:
1104 self.current_test_case_info.logger.debug(
1105 'symlink already exists')
1107 os.symlink(self.current_test_case_info.tempdir, link_path)
1109 except Exception as e:
1110 if self.current_test_case_info.logger:
1111 self.current_test_case_info.logger.error(e)
1113 def send_result_through_pipe(self, test, result):
1114 if hasattr(self, 'test_framework_result_pipe'):
1115 pipe = self.test_framework_result_pipe
1117 pipe.send((test.id(), result))
1119 def log_error(self, test, err, fn_name):
1120 if self.current_test_case_info:
1121 if isinstance(test, unittest.suite._ErrorHolder):
1122 test_name = test.description
1124 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1125 test._testMethodName,
1126 test._testMethodDoc)
1127 self.current_test_case_info.logger.debug(
1128 "--- %s() %s called, err is %s" %
1129 (fn_name, test_name, err))
1130 self.current_test_case_info.logger.debug(
1131 "formatted exception is:\n%s" %
1132 "".join(format_exception(*err)))
1134 def add_error(self, test, err, unittest_fn, error_type):
1135 if error_type == FAIL:
1136 self.log_error(test, err, 'addFailure')
1137 error_type_str = colorize("FAIL", RED)
1138 elif error_type == ERROR:
1139 self.log_error(test, err, 'addError')
1140 error_type_str = colorize("ERROR", RED)
1142 raise Exception('Error type %s cannot be used to record an '
1143 'error or a failure' % error_type)
1145 unittest_fn(self, test, err)
1146 if self.current_test_case_info:
1147 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1149 self.current_test_case_info.tempdir)
1150 self.symlink_failed()
1151 self.failed_test_cases_info.add(self.current_test_case_info)
1152 if is_core_present(self.current_test_case_info.tempdir):
1153 if not self.current_test_case_info.core_crash_test:
1154 if isinstance(test, unittest.suite._ErrorHolder):
1155 test_name = str(test)
1157 test_name = "'{}' ({})".format(
1158 get_testcase_doc_name(test), test.id())
1159 self.current_test_case_info.core_crash_test = test_name
1160 self.core_crash_test_cases_info.add(
1161 self.current_test_case_info)
1163 self.result_string = '%s [no temp dir]' % error_type_str
1165 self.send_result_through_pipe(test, error_type)
1167 def addFailure(self, test, err):
1169 Record a test failed result
1172 :param err: error message
1175 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1177 def addError(self, test, err):
1179 Record a test error result
1182 :param err: error message
1185 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1187 def getDescription(self, test):
1189 Get test description
1192 :returns: test description
1195 return get_test_description(self.descriptions, test)
1197 def startTest(self, test):
1205 def print_header(test):
1206 if not hasattr(test.__class__, '_header_printed'):
1207 print(double_line_delim)
1208 print(colorize(getdoc(test).splitlines()[0], GREEN))
1209 print(double_line_delim)
1210 test.__class__._header_printed = True
1214 unittest.TestResult.startTest(self, test)
1215 if self.verbosity > 0:
1216 self.stream.writeln(
1217 "Starting " + self.getDescription(test) + " ...")
1218 self.stream.writeln(single_line_delim)
1220 def stopTest(self, test):
1222 Called when the given test has been run
1227 unittest.TestResult.stopTest(self, test)
1228 if self.verbosity > 0:
1229 self.stream.writeln(single_line_delim)
1230 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1231 self.result_string))
1232 self.stream.writeln(single_line_delim)
1234 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1235 self.result_string))
1237 self.send_result_through_pipe(test, TEST_RUN)
1239 def printErrors(self):
1241 Print errors from running the test case
1243 if len(self.errors) > 0 or len(self.failures) > 0:
1244 self.stream.writeln()
1245 self.printErrorList('ERROR', self.errors)
1246 self.printErrorList('FAIL', self.failures)
1248 # ^^ that is the last output from unittest before summary
1249 if not self.runner.print_summary:
1250 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1251 self.stream = devnull
1252 self.runner.stream = devnull
1254 def printErrorList(self, flavour, errors):
1256 Print error list to the output stream together with error type
1257 and test case description.
1259 :param flavour: error type
1260 :param errors: iterable errors
1263 for test, err in errors:
1264 self.stream.writeln(double_line_delim)
1265 self.stream.writeln("%s: %s" %
1266 (flavour, self.getDescription(test)))
1267 self.stream.writeln(single_line_delim)
1268 self.stream.writeln("%s" % err)
1271 class VppTestRunner(unittest.TextTestRunner):
1273 A basic test runner implementation which prints results to standard error.
1277 def resultclass(self):
1278 """Class maintaining the results of the tests"""
1279 return VppTestResult
1281 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1282 result_pipe=None, failfast=False, buffer=False,
1283 resultclass=None, print_summary=True, **kwargs):
1284 # ignore stream setting here, use hard-coded stdout to be in sync
1285 # with prints from VppTestCase methods ...
1286 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1287 verbosity, failfast, buffer,
1288 resultclass, **kwargs)
1289 KeepAliveReporter.pipe = keep_alive_pipe
1291 self.orig_stream = self.stream
1292 self.resultclass.test_framework_result_pipe = result_pipe
1294 self.print_summary = print_summary
1296 def _makeResult(self):
1297 return self.resultclass(self.stream,
1302 def run(self, test):
1309 faulthandler.enable() # emit stack trace to stderr if killed by signal
1311 result = super(VppTestRunner, self).run(test)
1312 if not self.print_summary:
1313 self.stream = self.orig_stream
1314 result.stream = self.orig_stream
1318 class Worker(Thread):
1319 def __init__(self, args, logger, env={}):
1320 self.logger = logger
1323 self.env = copy.deepcopy(env)
1324 super(Worker, self).__init__()
1327 executable = self.args[0]
1328 self.logger.debug("Running executable w/args `%s'" % self.args)
1329 env = os.environ.copy()
1330 env.update(self.env)
1331 env["CK_LOG_FILE_NAME"] = "-"
1332 self.process = subprocess.Popen(
1333 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1334 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1335 out, err = self.process.communicate()
1336 self.logger.debug("Finished running `%s'" % executable)
1337 self.logger.info("Return code is `%s'" % self.process.returncode)
1338 self.logger.info(single_line_delim)
1339 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1340 self.logger.info(single_line_delim)
1341 self.logger.info(out)
1342 self.logger.info(single_line_delim)
1343 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1344 self.logger.info(single_line_delim)
1345 self.logger.info(err)
1346 self.logger.info(single_line_delim)
1347 self.result = self.process.returncode
1349 if __name__ == '__main__':