3 from __future__ import print_function
16 from collections import deque
17 from threading import Thread, Event
18 from inspect import getdoc, isclass
19 from traceback import format_exception
20 from logging import FileHandler, DEBUG, Formatter
21 from scapy.packet import Raw
22 from hook import StepHook, PollHook, VppDiedError
23 from vpp_pg_interface import VppPGInterface
24 from vpp_sub_interface import VppSubInterface
25 from vpp_lo_interface import VppLoInterface
26 from vpp_papi_provider import VppPapiProvider
27 from vpp_papi.vpp_stats import VPPStats
28 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
30 from vpp_object import VppObjectRegistry
31 from util import ppp, is_core_present
32 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
33 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
34 from scapy.layers.inet6 import ICMPv6EchoReply
36 if os.name == 'posix' and sys.version_info[0] < 3:
37 # using subprocess32 is recommended by python official documentation
38 # @ https://docs.python.org/2/library/subprocess.html
39 import subprocess32 as subprocess
49 debug_framework = False
50 if os.getenv('TEST_DEBUG', "0") == "1":
51 debug_framework = True
55 Test framework module.
57 The module provides a set of tools for constructing and running tests and
58 representing the results.
62 class _PacketInfo(object):
63 """Private class to create packet info object.
65 Help process information about the next packet.
66 Set variables to default values.
68 #: Store the index of the packet.
70 #: Store the index of the source packet generator interface of the packet.
72 #: Store the index of the destination packet generator interface
75 #: Store expected ip version
77 #: Store expected upper protocol
79 #: Store the copy of the former packet.
82 def __eq__(self, other):
83 index = self.index == other.index
84 src = self.src == other.src
85 dst = self.dst == other.dst
86 data = self.data == other.data
87 return index and src and dst and data
90 def pump_output(testclass):
91 """ pump output from vpp stdout/stderr to proper queues """
94 while not testclass.pump_thread_stop_flag.is_set():
95 readable = select.select([testclass.vpp.stdout.fileno(),
96 testclass.vpp.stderr.fileno(),
97 testclass.pump_thread_wakeup_pipe[0]],
99 if testclass.vpp.stdout.fileno() in readable:
100 read = os.read(testclass.vpp.stdout.fileno(), 102400)
102 split = read.splitlines(True)
103 if len(stdout_fragment) > 0:
104 split[0] = "%s%s" % (stdout_fragment, split[0])
105 if len(split) > 0 and split[-1].endswith("\n"):
109 stdout_fragment = split[-1]
110 testclass.vpp_stdout_deque.extend(split[:limit])
111 if not testclass.cache_vpp_output:
112 for line in split[:limit]:
113 testclass.logger.debug(
114 "VPP STDOUT: %s" % line.rstrip("\n"))
115 if testclass.vpp.stderr.fileno() in readable:
116 read = os.read(testclass.vpp.stderr.fileno(), 102400)
118 split = read.splitlines(True)
119 if len(stderr_fragment) > 0:
120 split[0] = "%s%s" % (stderr_fragment, split[0])
121 if len(split) > 0 and split[-1].endswith(b"\n"):
125 stderr_fragment = split[-1]
126 testclass.vpp_stderr_deque.extend(split[:limit])
127 if not testclass.cache_vpp_output:
128 for line in split[:limit]:
129 testclass.logger.debug(
130 "VPP STDERR: %s" % line.rstrip("\n"))
131 # ignoring the dummy pipe here intentionally - the
132 # flag will take care of properly terminating the loop
135 def is_skip_aarch64_set():
136 return os.getenv('SKIP_AARCH64', 'n').lower() in ('yes', 'y', '1')
139 def is_platform_aarch64():
140 return platform.machine() == 'aarch64'
143 def running_extended_tests():
144 s = os.getenv("EXTENDED_TESTS", "n")
145 return True if s.lower() in ("y", "yes", "1") else False
148 def running_on_centos():
149 os_id = os.getenv("OS_ID", "")
150 return True if "centos" in os_id.lower() else False
153 class KeepAliveReporter(object):
155 Singleton object which reports test start to parent process
160 self.__dict__ = self._shared_state
168 def pipe(self, pipe):
169 if self._pipe is not None:
170 raise Exception("Internal error - pipe should only be set once.")
173 def send_keep_alive(self, test, desc=None):
175 Write current test tmpdir & desc to keep-alive pipe to signal liveness
177 if self.pipe is None:
178 # if not running forked..
182 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
186 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
189 class VppTestCase(unittest.TestCase):
190 """This subclass is a base class for VPP test cases that are implemented as
191 classes. It provides methods to create and run test case.
194 extra_vpp_punt_config = []
197 def packet_infos(self):
198 """List of packet infos"""
199 return self._packet_infos
202 def get_packet_count_for_if_idx(cls, dst_if_index):
203 """Get the number of packet info for specified destination if index"""
204 if dst_if_index in cls._packet_count_for_dst_if_idx:
205 return cls._packet_count_for_dst_if_idx[dst_if_index]
211 """Return the instance of this testcase"""
212 return cls.test_instance
215 def set_debug_flags(cls, d):
216 cls.debug_core = False
217 cls.debug_gdb = False
218 cls.debug_gdbserver = False
223 cls.debug_core = True
226 elif dl == "gdbserver":
227 cls.debug_gdbserver = True
229 raise Exception("Unrecognized DEBUG option: '%s'" % d)
232 def get_least_used_cpu():
233 cpu_usage_list = [set(range(psutil.cpu_count()))]
234 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
235 if 'vpp_main' == p.info['name']]
236 for vpp_process in vpp_processes:
237 for cpu_usage_set in cpu_usage_list:
239 cpu_num = vpp_process.cpu_num()
240 if cpu_num in cpu_usage_set:
241 cpu_usage_set_index = cpu_usage_list.index(
243 if cpu_usage_set_index == len(cpu_usage_list) - 1:
244 cpu_usage_list.append({cpu_num})
246 cpu_usage_list[cpu_usage_set_index + 1].add(
248 cpu_usage_set.remove(cpu_num)
250 except psutil.NoSuchProcess:
253 for cpu_usage_set in cpu_usage_list:
254 if len(cpu_usage_set) > 0:
255 min_usage_set = cpu_usage_set
258 return random.choice(tuple(min_usage_set))
261 def print_header(cls):
262 if not hasattr(cls, '_header_printed'):
263 print(double_line_delim)
264 print(colorize(getdoc(cls).splitlines()[0], GREEN))
265 print(double_line_delim)
266 cls._header_printed = True
269 def setUpConstants(cls):
270 """ Set-up the test case class based on environment variables """
271 s = os.getenv("STEP", "n")
272 cls.step = True if s.lower() in ("y", "yes", "1") else False
273 d = os.getenv("DEBUG", None)
274 c = os.getenv("CACHE_OUTPUT", "1")
275 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
276 cls.set_debug_flags(d)
277 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
278 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
279 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
281 if cls.plugin_path is not None:
282 if cls.extern_plugin_path is not None:
283 plugin_path = "%s:%s" % (
284 cls.plugin_path, cls.extern_plugin_path)
286 plugin_path = cls.plugin_path
287 elif cls.extern_plugin_path is not None:
288 plugin_path = cls.extern_plugin_path
290 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
291 debug_cli = "cli-listen localhost:5002"
293 size = os.getenv("COREDUMP_SIZE")
295 coredump_size = "coredump-size %s" % size
296 if coredump_size is None:
297 coredump_size = "coredump-size unlimited"
299 cpu_core_number = cls.get_least_used_cpu()
301 cls.vpp_cmdline = [cls.vpp_bin, "unix",
302 "{", "nodaemon", debug_cli, "full-coredump",
303 coredump_size, "runtime-dir", cls.tempdir, "}",
304 "api-trace", "{", "on", "}", "api-segment", "{",
305 "prefix", cls.shm_prefix, "}", "cpu", "{",
306 "main-core", str(cpu_core_number), "}", "statseg",
307 "{", "socket-name", cls.stats_sock, "}", "plugins",
308 "{", "plugin", "dpdk_plugin.so", "{", "disable",
309 "}", "plugin", "unittest_plugin.so", "{", "enable",
311 if cls.extra_vpp_punt_config is not None:
312 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
313 if plugin_path is not None:
314 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
315 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
316 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
319 def wait_for_enter(cls):
320 if cls.debug_gdbserver:
321 print(double_line_delim)
322 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
324 print(double_line_delim)
325 print("Spawned VPP with PID: %d" % cls.vpp.pid)
327 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
329 print(single_line_delim)
330 print("You can debug the VPP using e.g.:")
331 if cls.debug_gdbserver:
332 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
333 print("Now is the time to attach a gdb by running the above "
334 "command, set up breakpoints etc. and then resume VPP from "
335 "within gdb by issuing the 'continue' command")
337 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
338 print("Now is the time to attach a gdb by running the above "
339 "command and set up breakpoints etc.")
340 print(single_line_delim)
341 raw_input("Press ENTER to continue running the testcase...")
345 cmdline = cls.vpp_cmdline
347 if cls.debug_gdbserver:
348 gdbserver = '/usr/bin/gdbserver'
349 if not os.path.isfile(gdbserver) or \
350 not os.access(gdbserver, os.X_OK):
351 raise Exception("gdbserver binary '%s' does not exist or is "
352 "not executable" % gdbserver)
354 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
355 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
358 cls.vpp = subprocess.Popen(cmdline,
359 stdout=subprocess.PIPE,
360 stderr=subprocess.PIPE,
362 except subprocess.CalledProcessError as e:
363 cls.logger.critical("Couldn't start vpp: %s" % e)
369 def wait_for_stats_socket(cls):
370 deadline = time.time() + 3
372 while time.time() < deadline or \
373 cls.debug_gdb or cls.debug_gdbserver:
374 if os.path.exists(cls.stats_sock):
379 cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
384 Perform class setup before running the testcase
385 Remove shared memory files, start vpp and connect the vpp-api
387 gc.collect() # run garbage collection first
390 cls.logger = get_logger(cls.__name__)
391 if hasattr(cls, 'parallel_handler'):
392 cls.logger.addHandler(cls.parallel_handler)
393 cls.logger.propagate = False
394 cls.tempdir = tempfile.mkdtemp(
395 prefix='vpp-unittest-%s-' % cls.__name__)
396 cls.stats_sock = "%s/stats.sock" % cls.tempdir
397 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
398 cls.file_handler.setFormatter(
399 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
401 cls.file_handler.setLevel(DEBUG)
402 cls.logger.addHandler(cls.file_handler)
403 cls.shm_prefix = os.path.basename(cls.tempdir)
404 os.chdir(cls.tempdir)
405 cls.logger.info("Temporary dir is %s, shm prefix is %s",
406 cls.tempdir, cls.shm_prefix)
408 cls.reset_packet_infos()
410 cls._zombie_captures = []
413 cls.registry = VppObjectRegistry()
414 cls.vpp_startup_failed = False
415 cls.reporter = KeepAliveReporter()
416 # need to catch exceptions here because if we raise, then the cleanup
417 # doesn't get called and we might end with a zombie vpp
420 cls.reporter.send_keep_alive(cls, 'setUpClass')
421 VppTestResult.current_test_case_info = TestCaseInfo(
422 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
423 cls.vpp_stdout_deque = deque()
424 cls.vpp_stderr_deque = deque()
425 cls.pump_thread_stop_flag = Event()
426 cls.pump_thread_wakeup_pipe = os.pipe()
427 cls.pump_thread = Thread(target=pump_output, args=(cls,))
428 cls.pump_thread.daemon = True
429 cls.pump_thread.start()
430 if cls.debug_gdb or cls.debug_gdbserver:
434 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
440 cls.vapi.register_hook(hook)
441 cls.wait_for_stats_socket()
442 cls.statistics = VPPStats(socketname=cls.stats_sock)
446 cls.vpp_startup_failed = True
448 "VPP died shortly after startup, check the"
449 " output to standard error for possible cause")
455 cls.vapi.disconnect()
458 if cls.debug_gdbserver:
459 print(colorize("You're running VPP inside gdbserver but "
460 "VPP-API connection failed, did you forget "
461 "to 'continue' VPP from within gdb?", RED))
473 Disconnect vpp-api, kill vpp and cleanup shared memory files
475 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
477 if cls.vpp.returncode is None:
478 print(double_line_delim)
479 print("VPP or GDB server is still running")
480 print(single_line_delim)
481 raw_input("When done debugging, press ENTER to kill the "
482 "process and finish running the testcase...")
484 # first signal that we want to stop the pump thread, then wake it up
485 if hasattr(cls, 'pump_thread_stop_flag'):
486 cls.pump_thread_stop_flag.set()
487 if hasattr(cls, 'pump_thread_wakeup_pipe'):
488 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
489 if hasattr(cls, 'pump_thread'):
490 cls.logger.debug("Waiting for pump thread to stop")
491 cls.pump_thread.join()
492 if hasattr(cls, 'vpp_stderr_reader_thread'):
493 cls.logger.debug("Waiting for stdderr pump to stop")
494 cls.vpp_stderr_reader_thread.join()
496 if hasattr(cls, 'vpp'):
497 if hasattr(cls, 'vapi'):
498 cls.vapi.disconnect()
501 if cls.vpp.returncode is None:
502 cls.logger.debug("Sending TERM to vpp")
504 cls.logger.debug("Waiting for vpp to die")
505 cls.vpp.communicate()
508 if cls.vpp_startup_failed:
509 stdout_log = cls.logger.info
510 stderr_log = cls.logger.critical
512 stdout_log = cls.logger.info
513 stderr_log = cls.logger.info
515 if hasattr(cls, 'vpp_stdout_deque'):
516 stdout_log(single_line_delim)
517 stdout_log('VPP output to stdout while running %s:', cls.__name__)
518 stdout_log(single_line_delim)
519 vpp_output = "".join(cls.vpp_stdout_deque)
520 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
522 stdout_log('\n%s', vpp_output)
523 stdout_log(single_line_delim)
525 if hasattr(cls, 'vpp_stderr_deque'):
526 stderr_log(single_line_delim)
527 stderr_log('VPP output to stderr while running %s:', cls.__name__)
528 stderr_log(single_line_delim)
529 vpp_output = "".join(str(cls.vpp_stderr_deque))
530 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
532 stderr_log('\n%s', vpp_output)
533 stderr_log(single_line_delim)
536 def tearDownClass(cls):
537 """ Perform final cleanup after running all tests in this test-case """
538 cls.reporter.send_keep_alive(cls, 'tearDownClass')
540 cls.file_handler.close()
541 cls.reset_packet_infos()
543 debug_internal.on_tear_down_class(cls)
546 """ Show various debug prints after each test """
547 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
548 (self.__class__.__name__, self._testMethodName,
549 self._testMethodDoc))
550 if not self.vpp_dead:
551 self.logger.debug(self.vapi.cli("show trace"))
552 self.logger.info(self.vapi.ppcli("show interface"))
553 self.logger.info(self.vapi.ppcli("show hardware"))
554 self.logger.info(self.statistics.set_errors_str())
555 self.logger.info(self.vapi.ppcli("show run"))
556 self.logger.info(self.vapi.ppcli("show log"))
557 self.registry.remove_vpp_config(self.logger)
558 # Save/Dump VPP api trace log
559 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
560 tmp_api_trace = "/tmp/%s" % api_trace
561 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
562 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
563 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
565 os.rename(tmp_api_trace, vpp_api_trace_log)
566 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
569 self.registry.unregister_all(self.logger)
572 """ Clear trace before running each test"""
573 self.reporter.send_keep_alive(self)
574 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
575 (self.__class__.__name__, self._testMethodName,
576 self._testMethodDoc))
578 raise Exception("VPP is dead when setting up the test")
579 self.sleep(.1, "during setUp")
580 self.vpp_stdout_deque.append(
581 "--- test setUp() for %s.%s(%s) starts here ---\n" %
582 (self.__class__.__name__, self._testMethodName,
583 self._testMethodDoc))
584 self.vpp_stderr_deque.append(
585 "--- test setUp() for %s.%s(%s) starts here ---\n" %
586 (self.__class__.__name__, self._testMethodName,
587 self._testMethodDoc))
588 self.vapi.cli("clear trace")
589 # store the test instance inside the test class - so that objects
590 # holding the class can access instance methods (like assertEqual)
591 type(self).test_instance = self
594 def pg_enable_capture(cls, interfaces=None):
596 Enable capture on packet-generator interfaces
598 :param interfaces: iterable interface indexes (if None,
599 use self.pg_interfaces)
602 if interfaces is None:
603 interfaces = cls.pg_interfaces
608 def register_capture(cls, cap_name):
609 """ Register a capture in the testclass """
610 # add to the list of captures with current timestamp
611 cls._captures.append((time.time(), cap_name))
612 # filter out from zombies
613 cls._zombie_captures = [(stamp, name)
614 for (stamp, name) in cls._zombie_captures
619 """ Remove any zombie captures and enable the packet generator """
620 # how long before capture is allowed to be deleted - otherwise vpp
621 # crashes - 100ms seems enough (this shouldn't be needed at all)
624 for stamp, cap_name in cls._zombie_captures:
625 wait = stamp + capture_ttl - now
627 cls.sleep(wait, "before deleting capture %s" % cap_name)
629 cls.logger.debug("Removing zombie capture %s" % cap_name)
630 cls.vapi.cli('packet-generator delete %s' % cap_name)
632 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
633 cls.vapi.cli('packet-generator enable')
634 cls._zombie_captures = cls._captures
638 def create_pg_interfaces(cls, interfaces):
640 Create packet-generator interfaces.
642 :param interfaces: iterable indexes of the interfaces.
643 :returns: List of created interfaces.
648 intf = VppPGInterface(cls, i)
649 setattr(cls, intf.name, intf)
651 cls.pg_interfaces = result
655 def create_loopback_interfaces(cls, count):
657 Create loopback interfaces.
659 :param count: number of interfaces created.
660 :returns: List of created interfaces.
662 result = [VppLoInterface(cls) for i in range(count)]
664 setattr(cls, intf.name, intf)
665 cls.lo_interfaces = result
669 def extend_packet(packet, size, padding=' '):
671 Extend packet to given size by padding with spaces or custom padding
672 NOTE: Currently works only when Raw layer is present.
674 :param packet: packet
675 :param size: target size
676 :param padding: padding used to extend the payload
679 packet_len = len(packet) + 4
680 extend = size - packet_len
682 num = (extend / len(padding)) + 1
683 packet[Raw].load += (padding * num)[:extend]
686 def reset_packet_infos(cls):
687 """ Reset the list of packet info objects and packet counts to zero """
688 cls._packet_infos = {}
689 cls._packet_count_for_dst_if_idx = {}
692 def create_packet_info(cls, src_if, dst_if):
694 Create packet info object containing the source and destination indexes
695 and add it to the testcase's packet info list
697 :param VppInterface src_if: source interface
698 :param VppInterface dst_if: destination interface
700 :returns: _PacketInfo object
704 info.index = len(cls._packet_infos)
705 info.src = src_if.sw_if_index
706 info.dst = dst_if.sw_if_index
707 if isinstance(dst_if, VppSubInterface):
708 dst_idx = dst_if.parent.sw_if_index
710 dst_idx = dst_if.sw_if_index
711 if dst_idx in cls._packet_count_for_dst_if_idx:
712 cls._packet_count_for_dst_if_idx[dst_idx] += 1
714 cls._packet_count_for_dst_if_idx[dst_idx] = 1
715 cls._packet_infos[info.index] = info
719 def info_to_payload(info):
721 Convert _PacketInfo object to packet payload
723 :param info: _PacketInfo object
725 :returns: string containing serialized data from packet info
727 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
731 def payload_to_info(payload):
733 Convert packet payload to _PacketInfo object
735 :param payload: packet payload
737 :returns: _PacketInfo object containing de-serialized data from payload
740 numbers = payload.split()
742 info.index = int(numbers[0])
743 info.src = int(numbers[1])
744 info.dst = int(numbers[2])
745 info.ip = int(numbers[3])
746 info.proto = int(numbers[4])
749 def get_next_packet_info(self, info):
751 Iterate over the packet info list stored in the testcase
752 Start iteration with first element if info is None
753 Continue based on index in info if info is specified
755 :param info: info or None
756 :returns: next info in list or None if no more infos
761 next_index = info.index + 1
762 if next_index == len(self._packet_infos):
765 return self._packet_infos[next_index]
767 def get_next_packet_info_for_interface(self, src_index, info):
769 Search the packet info list for the next packet info with same source
772 :param src_index: source interface index to search for
773 :param info: packet info - where to start the search
774 :returns: packet info or None
778 info = self.get_next_packet_info(info)
781 if info.src == src_index:
784 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
786 Search the packet info list for the next packet info with same source
787 and destination interface indexes
789 :param src_index: source interface index to search for
790 :param dst_index: destination interface index to search for
791 :param info: packet info - where to start the search
792 :returns: packet info or None
796 info = self.get_next_packet_info_for_interface(src_index, info)
799 if info.dst == dst_index:
802 def assert_equal(self, real_value, expected_value, name_or_class=None):
803 if name_or_class is None:
804 self.assertEqual(real_value, expected_value)
807 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
808 msg = msg % (getdoc(name_or_class).strip(),
809 real_value, str(name_or_class(real_value)),
810 expected_value, str(name_or_class(expected_value)))
812 msg = "Invalid %s: %s does not match expected value %s" % (
813 name_or_class, real_value, expected_value)
815 self.assertEqual(real_value, expected_value, msg)
817 def assert_in_range(self,
825 msg = "Invalid %s: %s out of range <%s,%s>" % (
826 name, real_value, expected_min, expected_max)
827 self.assertTrue(expected_min <= real_value <= expected_max, msg)
829 def assert_packet_checksums_valid(self, packet,
830 ignore_zero_udp_checksums=True):
831 received = packet.__class__(str(packet))
833 ppp("Verifying packet checksums for packet:", received))
834 udp_layers = ['UDP', 'UDPerror']
835 checksum_fields = ['cksum', 'chksum']
838 temp = received.__class__(str(received))
840 layer = temp.getlayer(counter)
842 for cf in checksum_fields:
843 if hasattr(layer, cf):
844 if ignore_zero_udp_checksums and \
845 0 == getattr(layer, cf) and \
846 layer.name in udp_layers:
849 checksums.append((counter, cf))
852 counter = counter + 1
853 if 0 == len(checksums):
855 temp = temp.__class__(str(temp))
856 for layer, cf in checksums:
857 calc_sum = getattr(temp[layer], cf)
859 getattr(received[layer], cf), calc_sum,
860 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
862 "Checksum field `%s` on `%s` layer has correct value `%s`" %
863 (cf, temp[layer].name, calc_sum))
865 def assert_checksum_valid(self, received_packet, layer,
867 ignore_zero_checksum=False):
868 """ Check checksum of received packet on given layer """
869 received_packet_checksum = getattr(received_packet[layer], field_name)
870 if ignore_zero_checksum and 0 == received_packet_checksum:
872 recalculated = received_packet.__class__(str(received_packet))
873 delattr(recalculated[layer], field_name)
874 recalculated = recalculated.__class__(str(recalculated))
875 self.assert_equal(received_packet_checksum,
876 getattr(recalculated[layer], field_name),
877 "packet checksum on layer: %s" % layer)
879 def assert_ip_checksum_valid(self, received_packet,
880 ignore_zero_checksum=False):
881 self.assert_checksum_valid(received_packet, 'IP',
882 ignore_zero_checksum=ignore_zero_checksum)
884 def assert_tcp_checksum_valid(self, received_packet,
885 ignore_zero_checksum=False):
886 self.assert_checksum_valid(received_packet, 'TCP',
887 ignore_zero_checksum=ignore_zero_checksum)
889 def assert_udp_checksum_valid(self, received_packet,
890 ignore_zero_checksum=True):
891 self.assert_checksum_valid(received_packet, 'UDP',
892 ignore_zero_checksum=ignore_zero_checksum)
894 def assert_embedded_icmp_checksum_valid(self, received_packet):
895 if received_packet.haslayer(IPerror):
896 self.assert_checksum_valid(received_packet, 'IPerror')
897 if received_packet.haslayer(TCPerror):
898 self.assert_checksum_valid(received_packet, 'TCPerror')
899 if received_packet.haslayer(UDPerror):
900 self.assert_checksum_valid(received_packet, 'UDPerror',
901 ignore_zero_checksum=True)
902 if received_packet.haslayer(ICMPerror):
903 self.assert_checksum_valid(received_packet, 'ICMPerror')
905 def assert_icmp_checksum_valid(self, received_packet):
906 self.assert_checksum_valid(received_packet, 'ICMP')
907 self.assert_embedded_icmp_checksum_valid(received_packet)
909 def assert_icmpv6_checksum_valid(self, pkt):
910 if pkt.haslayer(ICMPv6DestUnreach):
911 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
912 self.assert_embedded_icmp_checksum_valid(pkt)
913 if pkt.haslayer(ICMPv6EchoRequest):
914 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
915 if pkt.haslayer(ICMPv6EchoReply):
916 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
918 def assert_packet_counter_equal(self, counter, expected_value):
919 if counter.startswith("/"):
920 counter_value = self.statistics.get_counter(counter)
921 self.assert_equal(counter_value, expected_value,
922 "packet counter `%s'" % counter)
924 counters = self.vapi.cli("sh errors").split('\n')
926 for i in range(1, len(counters) - 1):
927 results = counters[i].split()
928 if results[1] == counter:
929 counter_value = int(results[0])
933 def sleep(cls, timeout, remark=None):
934 if hasattr(cls, 'logger'):
935 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
939 if hasattr(cls, 'logger') and after - before > 2 * timeout:
940 cls.logger.error("unexpected time.sleep() result - "
941 "slept for %es instead of ~%es!",
942 after - before, timeout)
943 if hasattr(cls, 'logger'):
945 "Finished sleep (%s) - slept %es (wanted %es)",
946 remark, after - before, timeout)
948 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
949 self.vapi.cli("clear trace")
950 intf.add_stream(pkts)
951 self.pg_enable_capture(self.pg_interfaces)
955 for i in self.pg_interfaces:
956 i.get_capture(0, timeout=timeout)
957 i.assert_nothing_captured(remark=remark)
960 def send_and_expect(self, input, pkts, output):
961 self.vapi.cli("clear trace")
962 input.add_stream(pkts)
963 self.pg_enable_capture(self.pg_interfaces)
965 if isinstance(object, (list,)):
968 rx.append(output.get_capture(len(pkts)))
970 rx = output.get_capture(len(pkts))
973 def send_and_expect_only(self, input, pkts, output, timeout=None):
974 self.vapi.cli("clear trace")
975 input.add_stream(pkts)
976 self.pg_enable_capture(self.pg_interfaces)
978 if isinstance(object, (list,)):
982 rx.append(output.get_capture(len(pkts)))
984 rx = output.get_capture(len(pkts))
988 for i in self.pg_interfaces:
990 i.get_capture(0, timeout=timeout)
991 i.assert_nothing_captured()
997 def get_testcase_doc_name(test):
998 return getdoc(test.__class__).splitlines()[0]
1001 def get_test_description(descriptions, test):
1002 short_description = test.shortDescription()
1003 if descriptions and short_description:
1004 return short_description
1009 class TestCaseInfo(object):
1010 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1011 self.logger = logger
1012 self.tempdir = tempdir
1013 self.vpp_pid = vpp_pid
1014 self.vpp_bin_path = vpp_bin_path
1015 self.core_crash_test = None
1018 class VppTestResult(unittest.TestResult):
1020 @property result_string
1021 String variable to store the test case result string.
1023 List variable containing 2-tuples of TestCase instances and strings
1024 holding formatted tracebacks. Each tuple represents a test which
1025 raised an unexpected exception.
1027 List variable containing 2-tuples of TestCase instances and strings
1028 holding formatted tracebacks. Each tuple represents a test where
1029 a failure was explicitly signalled using the TestCase.assert*()
1033 failed_test_cases_info = set()
1034 core_crash_test_cases_info = set()
1035 current_test_case_info = None
1037 def __init__(self, stream, descriptions, verbosity, runner):
1039 :param stream File descriptor to store where to report test results.
1040 Set to the standard error stream by default.
1041 :param descriptions Boolean variable to store information if to use
1042 test case descriptions.
1043 :param verbosity Integer variable to store required verbosity level.
1045 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
1046 self.stream = stream
1047 self.descriptions = descriptions
1048 self.verbosity = verbosity
1049 self.result_string = None
1050 self.runner = runner
1052 def addSuccess(self, test):
1054 Record a test succeeded result
1059 if self.current_test_case_info:
1060 self.current_test_case_info.logger.debug(
1061 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1062 test._testMethodName,
1063 test._testMethodDoc))
1064 unittest.TestResult.addSuccess(self, test)
1065 self.result_string = colorize("OK", GREEN)
1067 self.send_result_through_pipe(test, PASS)
1069 def addSkip(self, test, reason):
1071 Record a test skipped.
1077 if self.current_test_case_info:
1078 self.current_test_case_info.logger.debug(
1079 "--- addSkip() %s.%s(%s) called, reason is %s" %
1080 (test.__class__.__name__, test._testMethodName,
1081 test._testMethodDoc, reason))
1082 unittest.TestResult.addSkip(self, test, reason)
1083 self.result_string = colorize("SKIP", YELLOW)
1085 self.send_result_through_pipe(test, SKIP)
1087 def symlink_failed(self):
1088 if self.current_test_case_info:
1090 failed_dir = os.getenv('FAILED_DIR')
1091 link_path = os.path.join(
1094 os.path.basename(self.current_test_case_info.tempdir))
1095 if self.current_test_case_info.logger:
1096 self.current_test_case_info.logger.debug(
1097 "creating a link to the failed test")
1098 self.current_test_case_info.logger.debug(
1099 "os.symlink(%s, %s)" %
1100 (self.current_test_case_info.tempdir, link_path))
1101 if os.path.exists(link_path):
1102 if self.current_test_case_info.logger:
1103 self.current_test_case_info.logger.debug(
1104 'symlink already exists')
1106 os.symlink(self.current_test_case_info.tempdir, link_path)
1108 except Exception as e:
1109 if self.current_test_case_info.logger:
1110 self.current_test_case_info.logger.error(e)
1112 def send_result_through_pipe(self, test, result):
1113 if hasattr(self, 'test_framework_result_pipe'):
1114 pipe = self.test_framework_result_pipe
1116 pipe.send((test.id(), result))
1118 def log_error(self, test, err, fn_name):
1119 if self.current_test_case_info:
1120 if isinstance(test, unittest.suite._ErrorHolder):
1121 test_name = test.description
1123 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1124 test._testMethodName,
1125 test._testMethodDoc)
1126 self.current_test_case_info.logger.debug(
1127 "--- %s() %s called, err is %s" %
1128 (fn_name, test_name, err))
1129 self.current_test_case_info.logger.debug(
1130 "formatted exception is:\n%s" %
1131 "".join(format_exception(*err)))
1133 def add_error(self, test, err, unittest_fn, error_type):
1134 if error_type == FAIL:
1135 self.log_error(test, err, 'addFailure')
1136 error_type_str = colorize("FAIL", RED)
1137 elif error_type == ERROR:
1138 self.log_error(test, err, 'addError')
1139 error_type_str = colorize("ERROR", RED)
1141 raise Exception('Error type %s cannot be used to record an '
1142 'error or a failure' % error_type)
1144 unittest_fn(self, test, err)
1145 if self.current_test_case_info:
1146 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1148 self.current_test_case_info.tempdir)
1149 self.symlink_failed()
1150 self.failed_test_cases_info.add(self.current_test_case_info)
1151 if is_core_present(self.current_test_case_info.tempdir):
1152 if not self.current_test_case_info.core_crash_test:
1153 if isinstance(test, unittest.suite._ErrorHolder):
1154 test_name = str(test)
1156 test_name = "'{}' ({})".format(
1157 get_testcase_doc_name(test), test.id())
1158 self.current_test_case_info.core_crash_test = test_name
1159 self.core_crash_test_cases_info.add(
1160 self.current_test_case_info)
1162 self.result_string = '%s [no temp dir]' % error_type_str
1164 self.send_result_through_pipe(test, error_type)
1166 def addFailure(self, test, err):
1168 Record a test failed result
1171 :param err: error message
1174 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1176 def addError(self, test, err):
1178 Record a test error result
1181 :param err: error message
1184 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1186 def getDescription(self, test):
1188 Get test description
1191 :returns: test description
1194 return get_test_description(self.descriptions, test)
1196 def startTest(self, test):
1205 unittest.TestResult.startTest(self, test)
1206 if self.verbosity > 0:
1207 self.stream.writeln(
1208 "Starting " + self.getDescription(test) + " ...")
1209 self.stream.writeln(single_line_delim)
1211 def stopTest(self, test):
1213 Called when the given test has been run
1218 unittest.TestResult.stopTest(self, test)
1219 if self.verbosity > 0:
1220 self.stream.writeln(single_line_delim)
1221 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1222 self.result_string))
1223 self.stream.writeln(single_line_delim)
1225 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1226 self.result_string))
1228 self.send_result_through_pipe(test, TEST_RUN)
1230 def printErrors(self):
1232 Print errors from running the test case
1234 if len(self.errors) > 0 or len(self.failures) > 0:
1235 self.stream.writeln()
1236 self.printErrorList('ERROR', self.errors)
1237 self.printErrorList('FAIL', self.failures)
1239 # ^^ that is the last output from unittest before summary
1240 if not self.runner.print_summary:
1241 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1242 self.stream = devnull
1243 self.runner.stream = devnull
1245 def printErrorList(self, flavour, errors):
1247 Print error list to the output stream together with error type
1248 and test case description.
1250 :param flavour: error type
1251 :param errors: iterable errors
1254 for test, err in errors:
1255 self.stream.writeln(double_line_delim)
1256 self.stream.writeln("%s: %s" %
1257 (flavour, self.getDescription(test)))
1258 self.stream.writeln(single_line_delim)
1259 self.stream.writeln("%s" % err)
1262 class VppTestRunner(unittest.TextTestRunner):
1264 A basic test runner implementation which prints results to standard error.
1268 def resultclass(self):
1269 """Class maintaining the results of the tests"""
1270 return VppTestResult
1272 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1273 result_pipe=None, failfast=False, buffer=False,
1274 resultclass=None, print_summary=True):
1275 # ignore stream setting here, use hard-coded stdout to be in sync
1276 # with prints from VppTestCase methods ...
1277 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1278 verbosity, failfast, buffer,
1280 KeepAliveReporter.pipe = keep_alive_pipe
1282 self.orig_stream = self.stream
1283 self.resultclass.test_framework_result_pipe = result_pipe
1285 self.print_summary = print_summary
1287 def _makeResult(self):
1288 return self.resultclass(self.stream,
1293 def run(self, test):
1300 faulthandler.enable() # emit stack trace to stderr if killed by signal
1302 result = super(VppTestRunner, self).run(test)
1303 if not self.print_summary:
1304 self.stream = self.orig_stream
1305 result.stream = self.orig_stream
1309 class Worker(Thread):
1310 def __init__(self, args, logger, env={}):
1311 self.logger = logger
1314 self.env = copy.deepcopy(env)
1315 super(Worker, self).__init__()
1318 executable = self.args[0]
1319 self.logger.debug("Running executable w/args `%s'" % self.args)
1320 env = os.environ.copy()
1321 env.update(self.env)
1322 env["CK_LOG_FILE_NAME"] = "-"
1323 self.process = subprocess.Popen(
1324 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1325 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1326 out, err = self.process.communicate()
1327 self.logger.debug("Finished running `%s'" % executable)
1328 self.logger.info("Return code is `%s'" % self.process.returncode)
1329 self.logger.info(single_line_delim)
1330 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1331 self.logger.info(single_line_delim)
1332 self.logger.info(out)
1333 self.logger.info(single_line_delim)
1334 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1335 self.logger.info(single_line_delim)
1336 self.logger.info(err)
1337 self.logger.info(single_line_delim)
1338 self.result = self.process.returncode