3 from __future__ import print_function
16 from collections import deque
17 from threading import Thread, Event
18 from inspect import getdoc, isclass
19 from traceback import format_exception
20 from logging import FileHandler, DEBUG, Formatter
21 from scapy.packet import Raw
22 from hook import StepHook, PollHook, VppDiedError
23 from vpp_pg_interface import VppPGInterface
24 from vpp_sub_interface import VppSubInterface
25 from vpp_lo_interface import VppLoInterface
26 from vpp_papi_provider import VppPapiProvider
27 from vpp_papi.vpp_stats import VPPStats
28 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
30 from vpp_object import VppObjectRegistry
31 from util import ppp, is_core_present
32 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
33 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
34 from scapy.layers.inet6 import ICMPv6EchoReply
36 if os.name == 'posix' and sys.version_info[0] < 3:
37 # using subprocess32 is recommended by python official documentation
38 # @ https://docs.python.org/2/library/subprocess.html
39 import subprocess32 as subprocess
43 # Python2/3 compatible
55 debug_framework = False
56 if os.getenv('TEST_DEBUG', "0") == "1":
57 debug_framework = True
61 Test framework module.
63 The module provides a set of tools for constructing and running tests and
64 representing the results.
68 class _PacketInfo(object):
69 """Private class to create packet info object.
71 Help process information about the next packet.
72 Set variables to default values.
74 #: Store the index of the packet.
76 #: Store the index of the source packet generator interface of the packet.
78 #: Store the index of the destination packet generator interface
81 #: Store expected ip version
83 #: Store expected upper protocol
85 #: Store the copy of the former packet.
88 def __eq__(self, other):
89 index = self.index == other.index
90 src = self.src == other.src
91 dst = self.dst == other.dst
92 data = self.data == other.data
93 return index and src and dst and data
96 def pump_output(testclass):
97 """ pump output from vpp stdout/stderr to proper queues """
100 while not testclass.pump_thread_stop_flag.is_set():
101 readable = select.select([testclass.vpp.stdout.fileno(),
102 testclass.vpp.stderr.fileno(),
103 testclass.pump_thread_wakeup_pipe[0]],
105 if testclass.vpp.stdout.fileno() in readable:
106 read = os.read(testclass.vpp.stdout.fileno(), 102400)
108 split = read.splitlines(True)
109 if len(stdout_fragment) > 0:
110 split[0] = "%s%s" % (stdout_fragment, split[0])
111 if len(split) > 0 and split[-1].endswith("\n"):
115 stdout_fragment = split[-1]
116 testclass.vpp_stdout_deque.extend(split[:limit])
117 if not testclass.cache_vpp_output:
118 for line in split[:limit]:
119 testclass.logger.debug(
120 "VPP STDOUT: %s" % line.rstrip("\n"))
121 if testclass.vpp.stderr.fileno() in readable:
122 read = os.read(testclass.vpp.stderr.fileno(), 102400)
124 split = read.splitlines(True)
125 if len(stderr_fragment) > 0:
126 split[0] = "%s%s" % (stderr_fragment, split[0])
127 if len(split) > 0 and split[-1].endswith(b"\n"):
131 stderr_fragment = split[-1]
132 testclass.vpp_stderr_deque.extend(split[:limit])
133 if not testclass.cache_vpp_output:
134 for line in split[:limit]:
135 testclass.logger.debug(
136 "VPP STDERR: %s" % line.rstrip("\n"))
137 # ignoring the dummy pipe here intentionally - the
138 # flag will take care of properly terminating the loop
141 def _is_skip_aarch64_set():
142 return os.getenv('SKIP_AARCH64', 'n').lower() in ('yes', 'y', '1')
144 is_skip_aarch64_set = _is_skip_aarch64_set()
147 def _is_platform_aarch64():
148 return platform.machine() == 'aarch64'
150 is_platform_aarch64 = _is_platform_aarch64()
153 def _running_extended_tests():
154 s = os.getenv("EXTENDED_TESTS", "n")
155 return True if s.lower() in ("y", "yes", "1") else False
157 running_extended_tests = _running_extended_tests()
160 def _running_on_centos():
161 os_id = os.getenv("OS_ID", "")
162 return True if "centos" in os_id.lower() else False
164 running_on_centos = _running_on_centos
167 class KeepAliveReporter(object):
169 Singleton object which reports test start to parent process
174 self.__dict__ = self._shared_state
182 def pipe(self, pipe):
183 if self._pipe is not None:
184 raise Exception("Internal error - pipe should only be set once.")
187 def send_keep_alive(self, test, desc=None):
189 Write current test tmpdir & desc to keep-alive pipe to signal liveness
191 if self.pipe is None:
192 # if not running forked..
196 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
200 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
203 class VppTestCase(unittest.TestCase):
204 """This subclass is a base class for VPP test cases that are implemented as
205 classes. It provides methods to create and run test case.
208 extra_vpp_punt_config = []
209 extra_vpp_plugin_config = []
212 def packet_infos(self):
213 """List of packet infos"""
214 return self._packet_infos
217 def get_packet_count_for_if_idx(cls, dst_if_index):
218 """Get the number of packet info for specified destination if index"""
219 if dst_if_index in cls._packet_count_for_dst_if_idx:
220 return cls._packet_count_for_dst_if_idx[dst_if_index]
226 """Return the instance of this testcase"""
227 return cls.test_instance
230 def set_debug_flags(cls, d):
231 cls.debug_core = False
232 cls.debug_gdb = False
233 cls.debug_gdbserver = False
238 cls.debug_core = True
241 elif dl == "gdbserver":
242 cls.debug_gdbserver = True
244 raise Exception("Unrecognized DEBUG option: '%s'" % d)
247 def get_least_used_cpu():
248 cpu_usage_list = [set(range(psutil.cpu_count()))]
249 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
250 if 'vpp_main' == p.info['name']]
251 for vpp_process in vpp_processes:
252 for cpu_usage_set in cpu_usage_list:
254 cpu_num = vpp_process.cpu_num()
255 if cpu_num in cpu_usage_set:
256 cpu_usage_set_index = cpu_usage_list.index(
258 if cpu_usage_set_index == len(cpu_usage_list) - 1:
259 cpu_usage_list.append({cpu_num})
261 cpu_usage_list[cpu_usage_set_index + 1].add(
263 cpu_usage_set.remove(cpu_num)
265 except psutil.NoSuchProcess:
268 for cpu_usage_set in cpu_usage_list:
269 if len(cpu_usage_set) > 0:
270 min_usage_set = cpu_usage_set
273 return random.choice(tuple(min_usage_set))
276 def setUpConstants(cls):
277 """ Set-up the test case class based on environment variables """
278 s = os.getenv("STEP", "n")
279 cls.step = True if s.lower() in ("y", "yes", "1") else False
280 d = os.getenv("DEBUG", None)
281 c = os.getenv("CACHE_OUTPUT", "1")
282 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
283 cls.set_debug_flags(d)
284 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
285 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
286 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
288 if cls.plugin_path is not None:
289 if cls.extern_plugin_path is not None:
290 plugin_path = "%s:%s" % (
291 cls.plugin_path, cls.extern_plugin_path)
293 plugin_path = cls.plugin_path
294 elif cls.extern_plugin_path is not None:
295 plugin_path = cls.extern_plugin_path
297 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
298 debug_cli = "cli-listen localhost:5002"
300 size = os.getenv("COREDUMP_SIZE")
302 coredump_size = "coredump-size %s" % size
303 if coredump_size is None:
304 coredump_size = "coredump-size unlimited"
306 cpu_core_number = cls.get_least_used_cpu()
308 cls.vpp_cmdline = [cls.vpp_bin, "unix",
309 "{", "nodaemon", debug_cli, "full-coredump",
310 coredump_size, "runtime-dir", cls.tempdir, "}",
311 "api-trace", "{", "on", "}", "api-segment", "{",
312 "prefix", cls.shm_prefix, "}", "cpu", "{",
313 "main-core", str(cpu_core_number), "}", "statseg",
314 "{", "socket-name", cls.stats_sock, "}", "plugins",
315 "{", "plugin", "dpdk_plugin.so", "{", "disable",
316 "}", "plugin", "unittest_plugin.so", "{", "enable",
317 "}"] + cls.extra_vpp_plugin_config + ["}", ]
318 if cls.extra_vpp_punt_config is not None:
319 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
320 if plugin_path is not None:
321 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
322 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
323 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
326 def wait_for_enter(cls):
327 if cls.debug_gdbserver:
328 print(double_line_delim)
329 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
331 print(double_line_delim)
332 print("Spawned VPP with PID: %d" % cls.vpp.pid)
334 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
336 print(single_line_delim)
337 print("You can debug the VPP using e.g.:")
338 if cls.debug_gdbserver:
339 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
340 print("Now is the time to attach a gdb by running the above "
341 "command, set up breakpoints etc. and then resume VPP from "
342 "within gdb by issuing the 'continue' command")
344 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
345 print("Now is the time to attach a gdb by running the above "
346 "command and set up breakpoints etc.")
347 print(single_line_delim)
348 input("Press ENTER to continue running the testcase...")
352 cmdline = cls.vpp_cmdline
354 if cls.debug_gdbserver:
355 gdbserver = '/usr/bin/gdbserver'
356 if not os.path.isfile(gdbserver) or \
357 not os.access(gdbserver, os.X_OK):
358 raise Exception("gdbserver binary '%s' does not exist or is "
359 "not executable" % gdbserver)
361 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
362 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
365 cls.vpp = subprocess.Popen(cmdline,
366 stdout=subprocess.PIPE,
367 stderr=subprocess.PIPE,
369 except subprocess.CalledProcessError as e:
370 cls.logger.critical("Subprocess returned with non-0 return code: ("
374 cls.logger.critical("Subprocess returned with OS error: "
375 "(%s) %s", e.errno, e.strerror)
377 except Exception as e:
378 cls.logger.exception("Subprocess returned unexpected from "
385 def wait_for_stats_socket(cls):
386 deadline = time.time() + 3
388 while time.time() < deadline or \
389 cls.debug_gdb or cls.debug_gdbserver:
390 if os.path.exists(cls.stats_sock):
395 cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
400 Perform class setup before running the testcase
401 Remove shared memory files, start vpp and connect the vpp-api
403 super(VppTestCase, cls).setUpClass()
404 gc.collect() # run garbage collection first
406 cls.logger = get_logger(cls.__name__)
407 if hasattr(cls, 'parallel_handler'):
408 cls.logger.addHandler(cls.parallel_handler)
409 cls.logger.propagate = False
410 cls.tempdir = tempfile.mkdtemp(
411 prefix='vpp-unittest-%s-' % cls.__name__)
412 cls.stats_sock = "%s/stats.sock" % cls.tempdir
413 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
414 cls.file_handler.setFormatter(
415 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
417 cls.file_handler.setLevel(DEBUG)
418 cls.logger.addHandler(cls.file_handler)
419 cls.shm_prefix = os.path.basename(cls.tempdir)
420 os.chdir(cls.tempdir)
421 cls.logger.info("Temporary dir is %s, shm prefix is %s",
422 cls.tempdir, cls.shm_prefix)
424 cls.reset_packet_infos()
426 cls._zombie_captures = []
429 cls.registry = VppObjectRegistry()
430 cls.vpp_startup_failed = False
431 cls.reporter = KeepAliveReporter()
432 # need to catch exceptions here because if we raise, then the cleanup
433 # doesn't get called and we might end with a zombie vpp
436 cls.reporter.send_keep_alive(cls, 'setUpClass')
437 VppTestResult.current_test_case_info = TestCaseInfo(
438 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
439 cls.vpp_stdout_deque = deque()
440 cls.vpp_stderr_deque = deque()
441 cls.pump_thread_stop_flag = Event()
442 cls.pump_thread_wakeup_pipe = os.pipe()
443 cls.pump_thread = Thread(target=pump_output, args=(cls,))
444 cls.pump_thread.daemon = True
445 cls.pump_thread.start()
446 if cls.debug_gdb or cls.debug_gdbserver:
450 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
456 cls.vapi.register_hook(hook)
457 cls.wait_for_stats_socket()
458 cls.statistics = VPPStats(socketname=cls.stats_sock)
462 cls.vpp_startup_failed = True
464 "VPP died shortly after startup, check the"
465 " output to standard error for possible cause")
471 cls.vapi.disconnect()
474 if cls.debug_gdbserver:
475 print(colorize("You're running VPP inside gdbserver but "
476 "VPP-API connection failed, did you forget "
477 "to 'continue' VPP from within gdb?", RED))
489 Disconnect vpp-api, kill vpp and cleanup shared memory files
491 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
493 if cls.vpp.returncode is None:
494 print(double_line_delim)
495 print("VPP or GDB server is still running")
496 print(single_line_delim)
497 input("When done debugging, press ENTER to kill the "
498 "process and finish running the testcase...")
500 # first signal that we want to stop the pump thread, then wake it up
501 if hasattr(cls, 'pump_thread_stop_flag'):
502 cls.pump_thread_stop_flag.set()
503 if hasattr(cls, 'pump_thread_wakeup_pipe'):
504 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
505 if hasattr(cls, 'pump_thread'):
506 cls.logger.debug("Waiting for pump thread to stop")
507 cls.pump_thread.join()
508 if hasattr(cls, 'vpp_stderr_reader_thread'):
509 cls.logger.debug("Waiting for stdderr pump to stop")
510 cls.vpp_stderr_reader_thread.join()
512 if hasattr(cls, 'vpp'):
513 if hasattr(cls, 'vapi'):
514 cls.vapi.disconnect()
517 if cls.vpp.returncode is None:
518 cls.logger.debug("Sending TERM to vpp")
520 cls.logger.debug("Waiting for vpp to die")
521 cls.vpp.communicate()
524 if cls.vpp_startup_failed:
525 stdout_log = cls.logger.info
526 stderr_log = cls.logger.critical
528 stdout_log = cls.logger.info
529 stderr_log = cls.logger.info
531 if hasattr(cls, 'vpp_stdout_deque'):
532 stdout_log(single_line_delim)
533 stdout_log('VPP output to stdout while running %s:', cls.__name__)
534 stdout_log(single_line_delim)
535 vpp_output = "".join(cls.vpp_stdout_deque)
536 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
538 stdout_log('\n%s', vpp_output)
539 stdout_log(single_line_delim)
541 if hasattr(cls, 'vpp_stderr_deque'):
542 stderr_log(single_line_delim)
543 stderr_log('VPP output to stderr while running %s:', cls.__name__)
544 stderr_log(single_line_delim)
545 vpp_output = "".join(cls.vpp_stderr_deque)
546 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
548 stderr_log('\n%s', vpp_output)
549 stderr_log(single_line_delim)
552 def tearDownClass(cls):
553 """ Perform final cleanup after running all tests in this test-case """
554 cls.reporter.send_keep_alive(cls, 'tearDownClass')
556 cls.file_handler.close()
557 cls.reset_packet_infos()
559 debug_internal.on_tear_down_class(cls)
562 """ Show various debug prints after each test """
563 super(VppTestCase, self).tearDown()
564 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
565 (self.__class__.__name__, self._testMethodName,
566 self._testMethodDoc))
567 if not self.vpp_dead:
568 self.logger.debug(self.vapi.cli("show trace max 1000"))
569 self.logger.info(self.vapi.ppcli("show interface"))
570 self.logger.info(self.vapi.ppcli("show hardware"))
571 self.logger.info(self.statistics.set_errors_str())
572 self.logger.info(self.vapi.ppcli("show run"))
573 self.logger.info(self.vapi.ppcli("show log"))
574 self.registry.remove_vpp_config(self.logger)
575 # Save/Dump VPP api trace log
576 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
577 tmp_api_trace = "/tmp/%s" % api_trace
578 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
579 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
580 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
582 os.rename(tmp_api_trace, vpp_api_trace_log)
583 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
586 self.registry.unregister_all(self.logger)
589 """ Clear trace before running each test"""
590 super(VppTestCase, self).setUp()
591 self.reporter.send_keep_alive(self)
592 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
593 (self.__class__.__name__, self._testMethodName,
594 self._testMethodDoc))
596 raise Exception("VPP is dead when setting up the test")
597 self.sleep(.1, "during setUp")
598 self.vpp_stdout_deque.append(
599 "--- test setUp() for %s.%s(%s) starts here ---\n" %
600 (self.__class__.__name__, self._testMethodName,
601 self._testMethodDoc))
602 self.vpp_stderr_deque.append(
603 "--- test setUp() for %s.%s(%s) starts here ---\n" %
604 (self.__class__.__name__, self._testMethodName,
605 self._testMethodDoc))
606 self.vapi.cli("clear trace")
607 # store the test instance inside the test class - so that objects
608 # holding the class can access instance methods (like assertEqual)
609 type(self).test_instance = self
612 def pg_enable_capture(cls, interfaces=None):
614 Enable capture on packet-generator interfaces
616 :param interfaces: iterable interface indexes (if None,
617 use self.pg_interfaces)
620 if interfaces is None:
621 interfaces = cls.pg_interfaces
626 def register_capture(cls, cap_name):
627 """ Register a capture in the testclass """
628 # add to the list of captures with current timestamp
629 cls._captures.append((time.time(), cap_name))
630 # filter out from zombies
631 cls._zombie_captures = [(stamp, name)
632 for (stamp, name) in cls._zombie_captures
637 """ Remove any zombie captures and enable the packet generator """
638 # how long before capture is allowed to be deleted - otherwise vpp
639 # crashes - 100ms seems enough (this shouldn't be needed at all)
642 for stamp, cap_name in cls._zombie_captures:
643 wait = stamp + capture_ttl - now
645 cls.sleep(wait, "before deleting capture %s" % cap_name)
647 cls.logger.debug("Removing zombie capture %s" % cap_name)
648 cls.vapi.cli('packet-generator delete %s' % cap_name)
650 cls.vapi.cli("trace add pg-input 1000")
651 cls.vapi.cli('packet-generator enable')
652 cls._zombie_captures = cls._captures
656 def create_pg_interfaces(cls, interfaces):
658 Create packet-generator interfaces.
660 :param interfaces: iterable indexes of the interfaces.
661 :returns: List of created interfaces.
666 intf = VppPGInterface(cls, i)
667 setattr(cls, intf.name, intf)
669 cls.pg_interfaces = result
673 def create_loopback_interfaces(cls, count):
675 Create loopback interfaces.
677 :param count: number of interfaces created.
678 :returns: List of created interfaces.
680 result = [VppLoInterface(cls) for i in range(count)]
682 setattr(cls, intf.name, intf)
683 cls.lo_interfaces = result
687 def extend_packet(packet, size, padding=' '):
689 Extend packet to given size by padding with spaces or custom padding
690 NOTE: Currently works only when Raw layer is present.
692 :param packet: packet
693 :param size: target size
694 :param padding: padding used to extend the payload
697 packet_len = len(packet) + 4
698 extend = size - packet_len
700 num = (extend / len(padding)) + 1
701 packet[Raw].load += (padding * num)[:extend]
704 def reset_packet_infos(cls):
705 """ Reset the list of packet info objects and packet counts to zero """
706 cls._packet_infos = {}
707 cls._packet_count_for_dst_if_idx = {}
710 def create_packet_info(cls, src_if, dst_if):
712 Create packet info object containing the source and destination indexes
713 and add it to the testcase's packet info list
715 :param VppInterface src_if: source interface
716 :param VppInterface dst_if: destination interface
718 :returns: _PacketInfo object
722 info.index = len(cls._packet_infos)
723 info.src = src_if.sw_if_index
724 info.dst = dst_if.sw_if_index
725 if isinstance(dst_if, VppSubInterface):
726 dst_idx = dst_if.parent.sw_if_index
728 dst_idx = dst_if.sw_if_index
729 if dst_idx in cls._packet_count_for_dst_if_idx:
730 cls._packet_count_for_dst_if_idx[dst_idx] += 1
732 cls._packet_count_for_dst_if_idx[dst_idx] = 1
733 cls._packet_infos[info.index] = info
737 def info_to_payload(info):
739 Convert _PacketInfo object to packet payload
741 :param info: _PacketInfo object
743 :returns: string containing serialized data from packet info
745 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
749 def payload_to_info(payload, payload_field='load'):
751 Convert packet payload to _PacketInfo object
753 :param payload: packet payload
754 :type: <class 'scapy.packet.Raw'>
755 :param: payload_field: packet fieldname of payload "load" for
756 <class 'scapy.packet.Raw'>
757 :returns: _PacketInfo object containing de-serialized data from payload
760 numbers = getattr(payload, payload_field).split()
762 info.index = int(numbers[0])
763 info.src = int(numbers[1])
764 info.dst = int(numbers[2])
765 info.ip = int(numbers[3])
766 info.proto = int(numbers[4])
769 def get_next_packet_info(self, info):
771 Iterate over the packet info list stored in the testcase
772 Start iteration with first element if info is None
773 Continue based on index in info if info is specified
775 :param info: info or None
776 :returns: next info in list or None if no more infos
781 next_index = info.index + 1
782 if next_index == len(self._packet_infos):
785 return self._packet_infos[next_index]
787 def get_next_packet_info_for_interface(self, src_index, info):
789 Search the packet info list for the next packet info with same source
792 :param src_index: source interface index to search for
793 :param info: packet info - where to start the search
794 :returns: packet info or None
798 info = self.get_next_packet_info(info)
801 if info.src == src_index:
804 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
806 Search the packet info list for the next packet info with same source
807 and destination interface indexes
809 :param src_index: source interface index to search for
810 :param dst_index: destination interface index to search for
811 :param info: packet info - where to start the search
812 :returns: packet info or None
816 info = self.get_next_packet_info_for_interface(src_index, info)
819 if info.dst == dst_index:
822 def assert_equal(self, real_value, expected_value, name_or_class=None):
823 if name_or_class is None:
824 self.assertEqual(real_value, expected_value)
827 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
828 msg = msg % (getdoc(name_or_class).strip(),
829 real_value, str(name_or_class(real_value)),
830 expected_value, str(name_or_class(expected_value)))
832 msg = "Invalid %s: %s does not match expected value %s" % (
833 name_or_class, real_value, expected_value)
835 self.assertEqual(real_value, expected_value, msg)
837 def assert_in_range(self,
845 msg = "Invalid %s: %s out of range <%s,%s>" % (
846 name, real_value, expected_min, expected_max)
847 self.assertTrue(expected_min <= real_value <= expected_max, msg)
849 def assert_packet_checksums_valid(self, packet,
850 ignore_zero_udp_checksums=True):
851 received = packet.__class__(str(packet))
853 ppp("Verifying packet checksums for packet:", received))
854 udp_layers = ['UDP', 'UDPerror']
855 checksum_fields = ['cksum', 'chksum']
858 temp = received.__class__(str(received))
860 layer = temp.getlayer(counter)
862 for cf in checksum_fields:
863 if hasattr(layer, cf):
864 if ignore_zero_udp_checksums and \
865 0 == getattr(layer, cf) and \
866 layer.name in udp_layers:
869 checksums.append((counter, cf))
872 counter = counter + 1
873 if 0 == len(checksums):
875 temp = temp.__class__(str(temp))
876 for layer, cf in checksums:
877 calc_sum = getattr(temp[layer], cf)
879 getattr(received[layer], cf), calc_sum,
880 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
882 "Checksum field `%s` on `%s` layer has correct value `%s`" %
883 (cf, temp[layer].name, calc_sum))
885 def assert_checksum_valid(self, received_packet, layer,
887 ignore_zero_checksum=False):
888 """ Check checksum of received packet on given layer """
889 received_packet_checksum = getattr(received_packet[layer], field_name)
890 if ignore_zero_checksum and 0 == received_packet_checksum:
892 recalculated = received_packet.__class__(str(received_packet))
893 delattr(recalculated[layer], field_name)
894 recalculated = recalculated.__class__(str(recalculated))
895 self.assert_equal(received_packet_checksum,
896 getattr(recalculated[layer], field_name),
897 "packet checksum on layer: %s" % layer)
899 def assert_ip_checksum_valid(self, received_packet,
900 ignore_zero_checksum=False):
901 self.assert_checksum_valid(received_packet, 'IP',
902 ignore_zero_checksum=ignore_zero_checksum)
904 def assert_tcp_checksum_valid(self, received_packet,
905 ignore_zero_checksum=False):
906 self.assert_checksum_valid(received_packet, 'TCP',
907 ignore_zero_checksum=ignore_zero_checksum)
909 def assert_udp_checksum_valid(self, received_packet,
910 ignore_zero_checksum=True):
911 self.assert_checksum_valid(received_packet, 'UDP',
912 ignore_zero_checksum=ignore_zero_checksum)
914 def assert_embedded_icmp_checksum_valid(self, received_packet):
915 if received_packet.haslayer(IPerror):
916 self.assert_checksum_valid(received_packet, 'IPerror')
917 if received_packet.haslayer(TCPerror):
918 self.assert_checksum_valid(received_packet, 'TCPerror')
919 if received_packet.haslayer(UDPerror):
920 self.assert_checksum_valid(received_packet, 'UDPerror',
921 ignore_zero_checksum=True)
922 if received_packet.haslayer(ICMPerror):
923 self.assert_checksum_valid(received_packet, 'ICMPerror')
925 def assert_icmp_checksum_valid(self, received_packet):
926 self.assert_checksum_valid(received_packet, 'ICMP')
927 self.assert_embedded_icmp_checksum_valid(received_packet)
929 def assert_icmpv6_checksum_valid(self, pkt):
930 if pkt.haslayer(ICMPv6DestUnreach):
931 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
932 self.assert_embedded_icmp_checksum_valid(pkt)
933 if pkt.haslayer(ICMPv6EchoRequest):
934 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
935 if pkt.haslayer(ICMPv6EchoReply):
936 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
938 def assert_packet_counter_equal(self, counter, expected_value):
939 if counter.startswith("/"):
940 counter_value = self.statistics.get_counter(counter)
941 self.assert_equal(counter_value, expected_value,
942 "packet counter `%s'" % counter)
944 counters = self.vapi.cli("sh errors").split('\n')
946 for i in range(1, len(counters) - 1):
947 results = counters[i].split()
948 if results[1] == counter:
949 counter_value = int(results[0])
953 def sleep(cls, timeout, remark=None):
954 if hasattr(cls, 'logger'):
955 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
959 if hasattr(cls, 'logger') and after - before > 2 * timeout:
960 cls.logger.error("unexpected time.sleep() result - "
961 "slept for %es instead of ~%es!",
962 after - before, timeout)
963 if hasattr(cls, 'logger'):
965 "Finished sleep (%s) - slept %es (wanted %es)",
966 remark, after - before, timeout)
968 def pg_send(self, intf, pkts):
969 self.vapi.cli("clear trace")
970 intf.add_stream(pkts)
971 self.pg_enable_capture(self.pg_interfaces)
974 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
975 self.pg_send(intf, pkts)
978 for i in self.pg_interfaces:
979 i.get_capture(0, timeout=timeout)
980 i.assert_nothing_captured(remark=remark)
983 def send_and_expect(self, intf, pkts, output):
984 self.pg_send(intf, pkts)
985 rx = output.get_capture(len(pkts))
988 def send_and_expect_only(self, intf, pkts, output, timeout=None):
989 self.pg_send(intf, pkts)
990 rx = output.get_capture(len(pkts))
994 for i in self.pg_interfaces:
996 i.get_capture(0, timeout=timeout)
997 i.assert_nothing_captured()
1003 """ unittest calls runTest when TestCase is instantiated without a
1004 test case. Use case: Writing unittests against VppTestCase"""
1008 def get_testcase_doc_name(test):
1009 return getdoc(test.__class__).splitlines()[0]
1012 def get_test_description(descriptions, test):
1013 short_description = test.shortDescription()
1014 if descriptions and short_description:
1015 return short_description
1020 class TestCaseInfo(object):
1021 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1022 self.logger = logger
1023 self.tempdir = tempdir
1024 self.vpp_pid = vpp_pid
1025 self.vpp_bin_path = vpp_bin_path
1026 self.core_crash_test = None
1029 class VppTestResult(unittest.TestResult):
1031 @property result_string
1032 String variable to store the test case result string.
1034 List variable containing 2-tuples of TestCase instances and strings
1035 holding formatted tracebacks. Each tuple represents a test which
1036 raised an unexpected exception.
1038 List variable containing 2-tuples of TestCase instances and strings
1039 holding formatted tracebacks. Each tuple represents a test where
1040 a failure was explicitly signalled using the TestCase.assert*()
1044 failed_test_cases_info = set()
1045 core_crash_test_cases_info = set()
1046 current_test_case_info = None
1048 def __init__(self, stream=None, descriptions=None, verbosity=None,
1051 :param stream File descriptor to store where to report test results.
1052 Set to the standard error stream by default.
1053 :param descriptions Boolean variable to store information if to use
1054 test case descriptions.
1055 :param verbosity Integer variable to store required verbosity level.
1057 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1058 self.stream = stream
1059 self.descriptions = descriptions
1060 self.verbosity = verbosity
1061 self.result_string = None
1062 self.runner = runner
1064 def addSuccess(self, test):
1066 Record a test succeeded result
1071 if self.current_test_case_info:
1072 self.current_test_case_info.logger.debug(
1073 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1074 test._testMethodName,
1075 test._testMethodDoc))
1076 unittest.TestResult.addSuccess(self, test)
1077 self.result_string = colorize("OK", GREEN)
1079 self.send_result_through_pipe(test, PASS)
1081 def addSkip(self, test, reason):
1083 Record a test skipped.
1089 if self.current_test_case_info:
1090 self.current_test_case_info.logger.debug(
1091 "--- addSkip() %s.%s(%s) called, reason is %s" %
1092 (test.__class__.__name__, test._testMethodName,
1093 test._testMethodDoc, reason))
1094 unittest.TestResult.addSkip(self, test, reason)
1095 self.result_string = colorize("SKIP", YELLOW)
1097 self.send_result_through_pipe(test, SKIP)
1099 def symlink_failed(self):
1100 if self.current_test_case_info:
1102 failed_dir = os.getenv('FAILED_DIR')
1103 link_path = os.path.join(
1106 os.path.basename(self.current_test_case_info.tempdir))
1107 if self.current_test_case_info.logger:
1108 self.current_test_case_info.logger.debug(
1109 "creating a link to the failed test")
1110 self.current_test_case_info.logger.debug(
1111 "os.symlink(%s, %s)" %
1112 (self.current_test_case_info.tempdir, link_path))
1113 if os.path.exists(link_path):
1114 if self.current_test_case_info.logger:
1115 self.current_test_case_info.logger.debug(
1116 'symlink already exists')
1118 os.symlink(self.current_test_case_info.tempdir, link_path)
1120 except Exception as e:
1121 if self.current_test_case_info.logger:
1122 self.current_test_case_info.logger.error(e)
1124 def send_result_through_pipe(self, test, result):
1125 if hasattr(self, 'test_framework_result_pipe'):
1126 pipe = self.test_framework_result_pipe
1128 pipe.send((test.id(), result))
1130 def log_error(self, test, err, fn_name):
1131 if self.current_test_case_info:
1132 if isinstance(test, unittest.suite._ErrorHolder):
1133 test_name = test.description
1135 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1136 test._testMethodName,
1137 test._testMethodDoc)
1138 self.current_test_case_info.logger.debug(
1139 "--- %s() %s called, err is %s" %
1140 (fn_name, test_name, err))
1141 self.current_test_case_info.logger.debug(
1142 "formatted exception is:\n%s" %
1143 "".join(format_exception(*err)))
1145 def add_error(self, test, err, unittest_fn, error_type):
1146 if error_type == FAIL:
1147 self.log_error(test, err, 'addFailure')
1148 error_type_str = colorize("FAIL", RED)
1149 elif error_type == ERROR:
1150 self.log_error(test, err, 'addError')
1151 error_type_str = colorize("ERROR", RED)
1153 raise Exception('Error type %s cannot be used to record an '
1154 'error or a failure' % error_type)
1156 unittest_fn(self, test, err)
1157 if self.current_test_case_info:
1158 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1160 self.current_test_case_info.tempdir)
1161 self.symlink_failed()
1162 self.failed_test_cases_info.add(self.current_test_case_info)
1163 if is_core_present(self.current_test_case_info.tempdir):
1164 if not self.current_test_case_info.core_crash_test:
1165 if isinstance(test, unittest.suite._ErrorHolder):
1166 test_name = str(test)
1168 test_name = "'{!s}' ({!s})".format(
1169 get_testcase_doc_name(test), test.id())
1170 self.current_test_case_info.core_crash_test = test_name
1171 self.core_crash_test_cases_info.add(
1172 self.current_test_case_info)
1174 self.result_string = '%s [no temp dir]' % error_type_str
1176 self.send_result_through_pipe(test, error_type)
1178 def addFailure(self, test, err):
1180 Record a test failed result
1183 :param err: error message
1186 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1188 def addError(self, test, err):
1190 Record a test error result
1193 :param err: error message
1196 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1198 def getDescription(self, test):
1200 Get test description
1203 :returns: test description
1206 return get_test_description(self.descriptions, test)
1208 def startTest(self, test):
1216 def print_header(test):
1217 if not hasattr(test.__class__, '_header_printed'):
1218 print(double_line_delim)
1219 print(colorize(getdoc(test).splitlines()[0], GREEN))
1220 print(double_line_delim)
1221 test.__class__._header_printed = True
1225 unittest.TestResult.startTest(self, test)
1226 if self.verbosity > 0:
1227 self.stream.writeln(
1228 "Starting " + self.getDescription(test) + " ...")
1229 self.stream.writeln(single_line_delim)
1231 def stopTest(self, test):
1233 Called when the given test has been run
1238 unittest.TestResult.stopTest(self, test)
1239 if self.verbosity > 0:
1240 self.stream.writeln(single_line_delim)
1241 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1242 self.result_string))
1243 self.stream.writeln(single_line_delim)
1245 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1246 self.result_string))
1248 self.send_result_through_pipe(test, TEST_RUN)
1250 def printErrors(self):
1252 Print errors from running the test case
1254 if len(self.errors) > 0 or len(self.failures) > 0:
1255 self.stream.writeln()
1256 self.printErrorList('ERROR', self.errors)
1257 self.printErrorList('FAIL', self.failures)
1259 # ^^ that is the last output from unittest before summary
1260 if not self.runner.print_summary:
1261 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1262 self.stream = devnull
1263 self.runner.stream = devnull
1265 def printErrorList(self, flavour, errors):
1267 Print error list to the output stream together with error type
1268 and test case description.
1270 :param flavour: error type
1271 :param errors: iterable errors
1274 for test, err in errors:
1275 self.stream.writeln(double_line_delim)
1276 self.stream.writeln("%s: %s" %
1277 (flavour, self.getDescription(test)))
1278 self.stream.writeln(single_line_delim)
1279 self.stream.writeln("%s" % err)
1282 class VppTestRunner(unittest.TextTestRunner):
1284 A basic test runner implementation which prints results to standard error.
1288 def resultclass(self):
1289 """Class maintaining the results of the tests"""
1290 return VppTestResult
1292 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1293 result_pipe=None, failfast=False, buffer=False,
1294 resultclass=None, print_summary=True, **kwargs):
1295 # ignore stream setting here, use hard-coded stdout to be in sync
1296 # with prints from VppTestCase methods ...
1297 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1298 verbosity, failfast, buffer,
1299 resultclass, **kwargs)
1300 KeepAliveReporter.pipe = keep_alive_pipe
1302 self.orig_stream = self.stream
1303 self.resultclass.test_framework_result_pipe = result_pipe
1305 self.print_summary = print_summary
1307 def _makeResult(self):
1308 return self.resultclass(self.stream,
1313 def run(self, test):
1320 faulthandler.enable() # emit stack trace to stderr if killed by signal
1322 result = super(VppTestRunner, self).run(test)
1323 if not self.print_summary:
1324 self.stream = self.orig_stream
1325 result.stream = self.orig_stream
1329 class Worker(Thread):
1330 def __init__(self, args, logger, env={}):
1331 self.logger = logger
1334 self.env = copy.deepcopy(env)
1335 super(Worker, self).__init__()
1338 executable = self.args[0]
1339 self.logger.debug("Running executable w/args `%s'" % self.args)
1340 env = os.environ.copy()
1341 env.update(self.env)
1342 env["CK_LOG_FILE_NAME"] = "-"
1343 self.process = subprocess.Popen(
1344 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1345 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1346 out, err = self.process.communicate()
1347 self.logger.debug("Finished running `%s'" % executable)
1348 self.logger.info("Return code is `%s'" % self.process.returncode)
1349 self.logger.info(single_line_delim)
1350 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1351 self.logger.info(single_line_delim)
1352 self.logger.info(out)
1353 self.logger.info(single_line_delim)
1354 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1355 self.logger.info(single_line_delim)
1356 self.logger.info(err)
1357 self.logger.info(single_line_delim)
1358 self.result = self.process.returncode
1360 if __name__ == '__main__':