3 from __future__ import print_function
16 from collections import deque
17 from threading import Thread, Event
18 from inspect import getdoc, isclass
19 from traceback import format_exception
20 from logging import FileHandler, DEBUG, Formatter
21 from scapy.packet import Raw
22 from hook import StepHook, PollHook, VppDiedError
23 from vpp_pg_interface import VppPGInterface
24 from vpp_sub_interface import VppSubInterface
25 from vpp_lo_interface import VppLoInterface
26 from vpp_papi_provider import VppPapiProvider
27 from vpp_papi.vpp_stats import VPPStats
28 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
30 from vpp_object import VppObjectRegistry
31 from util import ppp, is_core_present
32 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
33 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
34 from scapy.layers.inet6 import ICMPv6EchoReply
35 if os.name == 'posix' and sys.version_info[0] < 3:
36 # using subprocess32 is recommended by python official documentation
37 # @ https://docs.python.org/2/library/subprocess.html
38 import subprocess32 as subprocess
50 debug_framework = False
51 if os.getenv('TEST_DEBUG', "0") == "1":
52 debug_framework = True
57 Test framework module.
59 The module provides a set of tools for constructing and running tests and
60 representing the results.
64 class _PacketInfo(object):
65 """Private class to create packet info object.
67 Help process information about the next packet.
68 Set variables to default values.
70 #: Store the index of the packet.
72 #: Store the index of the source packet generator interface of the packet.
74 #: Store the index of the destination packet generator interface
77 #: Store expected ip version
79 #: Store expected upper protocol
81 #: Store the copy of the former packet.
84 def __eq__(self, other):
85 index = self.index == other.index
86 src = self.src == other.src
87 dst = self.dst == other.dst
88 data = self.data == other.data
89 return index and src and dst and data
92 def pump_output(testclass):
93 """ pump output from vpp stdout/stderr to proper queues """
96 while not testclass.pump_thread_stop_flag.is_set():
97 readable = select.select([testclass.vpp.stdout.fileno(),
98 testclass.vpp.stderr.fileno(),
99 testclass.pump_thread_wakeup_pipe[0]],
101 if testclass.vpp.stdout.fileno() in readable:
102 read = os.read(testclass.vpp.stdout.fileno(), 102400)
104 split = read.splitlines(True)
105 if len(stdout_fragment) > 0:
106 split[0] = "%s%s" % (stdout_fragment, split[0])
107 if len(split) > 0 and split[-1].endswith("\n"):
111 stdout_fragment = split[-1]
112 testclass.vpp_stdout_deque.extend(split[:limit])
113 if not testclass.cache_vpp_output:
114 for line in split[:limit]:
115 testclass.logger.debug(
116 "VPP STDOUT: %s" % line.rstrip("\n"))
117 if testclass.vpp.stderr.fileno() in readable:
118 read = os.read(testclass.vpp.stderr.fileno(), 102400)
120 split = read.splitlines(True)
121 if len(stderr_fragment) > 0:
122 split[0] = "%s%s" % (stderr_fragment, split[0])
123 if len(split) > 0 and split[-1].endswith("\n"):
127 stderr_fragment = split[-1]
128 testclass.vpp_stderr_deque.extend(split[:limit])
129 if not testclass.cache_vpp_output:
130 for line in split[:limit]:
131 testclass.logger.debug(
132 "VPP STDERR: %s" % line.rstrip("\n"))
133 # ignoring the dummy pipe here intentionally - the flag will take care
134 # of properly terminating the loop
137 def is_skip_aarch64_set():
138 return os.getenv('SKIP_AARCH64', 'n').lower() in ('yes', 'y', '1')
141 def is_platform_aarch64():
142 return platform.machine() == 'aarch64'
145 def running_extended_tests():
146 s = os.getenv("EXTENDED_TESTS", "n")
147 return True if s.lower() in ("y", "yes", "1") else False
150 def running_on_centos():
151 os_id = os.getenv("OS_ID", "")
152 return True if "centos" in os_id.lower() else False
155 class KeepAliveReporter(object):
157 Singleton object which reports test start to parent process
162 self.__dict__ = self._shared_state
170 def pipe(self, pipe):
171 if self._pipe is not None:
172 raise Exception("Internal error - pipe should only be set once.")
175 def send_keep_alive(self, test, desc=None):
177 Write current test tmpdir & desc to keep-alive pipe to signal liveness
179 if self.pipe is None:
180 # if not running forked..
184 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
188 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
191 class VppTestCase(unittest.TestCase):
192 """This subclass is a base class for VPP test cases that are implemented as
193 classes. It provides methods to create and run test case.
197 def packet_infos(self):
198 """List of packet infos"""
199 return self._packet_infos
202 def get_packet_count_for_if_idx(cls, dst_if_index):
203 """Get the number of packet info for specified destination if index"""
204 if dst_if_index in cls._packet_count_for_dst_if_idx:
205 return cls._packet_count_for_dst_if_idx[dst_if_index]
211 """Return the instance of this testcase"""
212 return cls.test_instance
215 def set_debug_flags(cls, d):
216 cls.debug_core = False
217 cls.debug_gdb = False
218 cls.debug_gdbserver = False
223 cls.debug_core = True
226 elif dl == "gdbserver":
227 cls.debug_gdbserver = True
229 raise Exception("Unrecognized DEBUG option: '%s'" % d)
232 def get_least_used_cpu():
233 cpu_usage_list = [set(range(psutil.cpu_count()))]
234 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
235 if 'vpp_main' == p.info['name']]
236 for vpp_process in vpp_processes:
237 for cpu_usage_set in cpu_usage_list:
239 cpu_num = vpp_process.cpu_num()
240 if cpu_num in cpu_usage_set:
241 cpu_usage_set_index = cpu_usage_list.index(
243 if cpu_usage_set_index == len(cpu_usage_list) - 1:
244 cpu_usage_list.append({cpu_num})
246 cpu_usage_list[cpu_usage_set_index + 1].add(
248 cpu_usage_set.remove(cpu_num)
250 except psutil.NoSuchProcess:
253 for cpu_usage_set in cpu_usage_list:
254 if len(cpu_usage_set) > 0:
255 min_usage_set = cpu_usage_set
258 return random.choice(tuple(min_usage_set))
261 def print_header(cls):
262 if not hasattr(cls, '_header_printed'):
263 print(double_line_delim)
264 print(colorize(getdoc(cls).splitlines()[0], GREEN))
265 print(double_line_delim)
266 cls._header_printed = True
269 def setUpConstants(cls):
270 """ Set-up the test case class based on environment variables """
271 s = os.getenv("STEP", "n")
272 cls.step = True if s.lower() in ("y", "yes", "1") else False
273 d = os.getenv("DEBUG", None)
274 c = os.getenv("CACHE_OUTPUT", "1")
275 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
276 cls.set_debug_flags(d)
277 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
278 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
279 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
281 if cls.plugin_path is not None:
282 if cls.extern_plugin_path is not None:
283 plugin_path = "%s:%s" % (
284 cls.plugin_path, cls.extern_plugin_path)
286 plugin_path = cls.plugin_path
287 elif cls.extern_plugin_path is not None:
288 plugin_path = cls.extern_plugin_path
290 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
291 debug_cli = "cli-listen localhost:5002"
293 size = os.getenv("COREDUMP_SIZE")
295 coredump_size = "coredump-size %s" % size
296 if coredump_size is None:
297 coredump_size = "coredump-size unlimited"
299 cpu_core_number = cls.get_least_used_cpu()
301 cls.vpp_cmdline = [cls.vpp_bin, "unix",
302 "{", "nodaemon", debug_cli, "full-coredump",
303 coredump_size, "runtime-dir", cls.tempdir, "}",
304 "api-trace", "{", "on", "}", "api-segment", "{",
305 "prefix", cls.shm_prefix, "}", "cpu", "{",
306 "main-core", str(cpu_core_number), "}", "statseg",
307 "{", "socket-name", cls.stats_sock, "}", "plugins",
308 "{", "plugin", "dpdk_plugin.so", "{", "disable",
309 "}", "plugin", "unittest_plugin.so", "{", "enable",
311 if plugin_path is not None:
312 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
313 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
314 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
317 def wait_for_enter(cls):
318 if cls.debug_gdbserver:
319 print(double_line_delim)
320 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
322 print(double_line_delim)
323 print("Spawned VPP with PID: %d" % cls.vpp.pid)
325 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
327 print(single_line_delim)
328 print("You can debug the VPP using e.g.:")
329 if cls.debug_gdbserver:
330 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
331 print("Now is the time to attach a gdb by running the above "
332 "command, set up breakpoints etc. and then resume VPP from "
333 "within gdb by issuing the 'continue' command")
335 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
336 print("Now is the time to attach a gdb by running the above "
337 "command and set up breakpoints etc.")
338 print(single_line_delim)
339 raw_input("Press ENTER to continue running the testcase...")
343 cmdline = cls.vpp_cmdline
345 if cls.debug_gdbserver:
346 gdbserver = '/usr/bin/gdbserver'
347 if not os.path.isfile(gdbserver) or \
348 not os.access(gdbserver, os.X_OK):
349 raise Exception("gdbserver binary '%s' does not exist or is "
350 "not executable" % gdbserver)
352 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
353 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
356 cls.vpp = subprocess.Popen(cmdline,
357 stdout=subprocess.PIPE,
358 stderr=subprocess.PIPE,
360 except subprocess.CalledProcessError as e:
361 cls.logger.critical("Couldn't start vpp: %s" % e)
367 def wait_for_stats_socket(cls):
368 deadline = time.time() + 3
370 while time.time() < deadline or \
371 cls.debug_gdb or cls.debug_gdbserver:
372 if os.path.exists(cls.stats_sock):
377 cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
382 Perform class setup before running the testcase
383 Remove shared memory files, start vpp and connect the vpp-api
385 gc.collect() # run garbage collection first
388 cls.logger = get_logger(cls.__name__)
389 if hasattr(cls, 'parallel_handler'):
390 cls.logger.addHandler(cls.parallel_handler)
391 cls.logger.propagate = False
392 cls.tempdir = tempfile.mkdtemp(
393 prefix='vpp-unittest-%s-' % cls.__name__)
394 cls.stats_sock = "%s/stats.sock" % cls.tempdir
395 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
396 cls.file_handler.setFormatter(
397 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
399 cls.file_handler.setLevel(DEBUG)
400 cls.logger.addHandler(cls.file_handler)
401 cls.shm_prefix = os.path.basename(cls.tempdir)
402 os.chdir(cls.tempdir)
403 cls.logger.info("Temporary dir is %s, shm prefix is %s",
404 cls.tempdir, cls.shm_prefix)
406 cls.reset_packet_infos()
408 cls._zombie_captures = []
411 cls.registry = VppObjectRegistry()
412 cls.vpp_startup_failed = False
413 cls.reporter = KeepAliveReporter()
414 # need to catch exceptions here because if we raise, then the cleanup
415 # doesn't get called and we might end with a zombie vpp
418 cls.reporter.send_keep_alive(cls, 'setUpClass')
419 VppTestResult.current_test_case_info = TestCaseInfo(
420 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
421 cls.vpp_stdout_deque = deque()
422 cls.vpp_stderr_deque = deque()
423 cls.pump_thread_stop_flag = Event()
424 cls.pump_thread_wakeup_pipe = os.pipe()
425 cls.pump_thread = Thread(target=pump_output, args=(cls,))
426 cls.pump_thread.daemon = True
427 cls.pump_thread.start()
428 if cls.debug_gdb or cls.debug_gdbserver:
432 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
438 cls.vapi.register_hook(hook)
439 cls.wait_for_stats_socket()
440 cls.statistics = VPPStats(socketname=cls.stats_sock)
444 cls.vpp_startup_failed = True
446 "VPP died shortly after startup, check the"
447 " output to standard error for possible cause")
453 cls.vapi.disconnect()
456 if cls.debug_gdbserver:
457 print(colorize("You're running VPP inside gdbserver but "
458 "VPP-API connection failed, did you forget "
459 "to 'continue' VPP from within gdb?", RED))
471 Disconnect vpp-api, kill vpp and cleanup shared memory files
473 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
475 if cls.vpp.returncode is None:
476 print(double_line_delim)
477 print("VPP or GDB server is still running")
478 print(single_line_delim)
479 raw_input("When done debugging, press ENTER to kill the "
480 "process and finish running the testcase...")
482 # first signal that we want to stop the pump thread, then wake it up
483 if hasattr(cls, 'pump_thread_stop_flag'):
484 cls.pump_thread_stop_flag.set()
485 if hasattr(cls, 'pump_thread_wakeup_pipe'):
486 os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
487 if hasattr(cls, 'pump_thread'):
488 cls.logger.debug("Waiting for pump thread to stop")
489 cls.pump_thread.join()
490 if hasattr(cls, 'vpp_stderr_reader_thread'):
491 cls.logger.debug("Waiting for stdderr pump to stop")
492 cls.vpp_stderr_reader_thread.join()
494 if hasattr(cls, 'vpp'):
495 if hasattr(cls, 'vapi'):
496 cls.vapi.disconnect()
499 if cls.vpp.returncode is None:
500 cls.logger.debug("Sending TERM to vpp")
502 cls.logger.debug("Waiting for vpp to die")
503 cls.vpp.communicate()
506 if cls.vpp_startup_failed:
507 stdout_log = cls.logger.info
508 stderr_log = cls.logger.critical
510 stdout_log = cls.logger.info
511 stderr_log = cls.logger.info
513 if hasattr(cls, 'vpp_stdout_deque'):
514 stdout_log(single_line_delim)
515 stdout_log('VPP output to stdout while running %s:', cls.__name__)
516 stdout_log(single_line_delim)
517 vpp_output = "".join(cls.vpp_stdout_deque)
518 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
520 stdout_log('\n%s', vpp_output)
521 stdout_log(single_line_delim)
523 if hasattr(cls, 'vpp_stderr_deque'):
524 stderr_log(single_line_delim)
525 stderr_log('VPP output to stderr while running %s:', cls.__name__)
526 stderr_log(single_line_delim)
527 vpp_output = "".join(cls.vpp_stderr_deque)
528 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
530 stderr_log('\n%s', vpp_output)
531 stderr_log(single_line_delim)
534 def tearDownClass(cls):
535 """ Perform final cleanup after running all tests in this test-case """
536 cls.reporter.send_keep_alive(cls, 'tearDownClass')
538 cls.file_handler.close()
539 cls.reset_packet_infos()
541 debug_internal.on_tear_down_class(cls)
544 """ Show various debug prints after each test """
545 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
546 (self.__class__.__name__, self._testMethodName,
547 self._testMethodDoc))
548 if not self.vpp_dead:
549 self.logger.debug(self.vapi.cli("show trace"))
550 self.logger.info(self.vapi.ppcli("show interface"))
551 self.logger.info(self.vapi.ppcli("show hardware"))
552 self.logger.info(self.statistics.set_errors_str())
553 self.logger.info(self.vapi.ppcli("show run"))
554 self.logger.info(self.vapi.ppcli("show log"))
555 self.registry.remove_vpp_config(self.logger)
556 # Save/Dump VPP api trace log
557 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
558 tmp_api_trace = "/tmp/%s" % api_trace
559 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
560 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
561 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
563 os.rename(tmp_api_trace, vpp_api_trace_log)
564 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
567 self.registry.unregister_all(self.logger)
570 """ Clear trace before running each test"""
571 self.reporter.send_keep_alive(self)
572 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
573 (self.__class__.__name__, self._testMethodName,
574 self._testMethodDoc))
576 raise Exception("VPP is dead when setting up the test")
577 self.sleep(.1, "during setUp")
578 self.vpp_stdout_deque.append(
579 "--- test setUp() for %s.%s(%s) starts here ---\n" %
580 (self.__class__.__name__, self._testMethodName,
581 self._testMethodDoc))
582 self.vpp_stderr_deque.append(
583 "--- test setUp() for %s.%s(%s) starts here ---\n" %
584 (self.__class__.__name__, self._testMethodName,
585 self._testMethodDoc))
586 self.vapi.cli("clear trace")
587 # store the test instance inside the test class - so that objects
588 # holding the class can access instance methods (like assertEqual)
589 type(self).test_instance = self
592 def pg_enable_capture(cls, interfaces=None):
594 Enable capture on packet-generator interfaces
596 :param interfaces: iterable interface indexes (if None,
597 use self.pg_interfaces)
600 if interfaces is None:
601 interfaces = cls.pg_interfaces
606 def register_capture(cls, cap_name):
607 """ Register a capture in the testclass """
608 # add to the list of captures with current timestamp
609 cls._captures.append((time.time(), cap_name))
610 # filter out from zombies
611 cls._zombie_captures = [(stamp, name)
612 for (stamp, name) in cls._zombie_captures
617 """ Remove any zombie captures and enable the packet generator """
618 # how long before capture is allowed to be deleted - otherwise vpp
619 # crashes - 100ms seems enough (this shouldn't be needed at all)
622 for stamp, cap_name in cls._zombie_captures:
623 wait = stamp + capture_ttl - now
625 cls.sleep(wait, "before deleting capture %s" % cap_name)
627 cls.logger.debug("Removing zombie capture %s" % cap_name)
628 cls.vapi.cli('packet-generator delete %s' % cap_name)
630 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
631 cls.vapi.cli('packet-generator enable')
632 cls._zombie_captures = cls._captures
636 def create_pg_interfaces(cls, interfaces):
638 Create packet-generator interfaces.
640 :param interfaces: iterable indexes of the interfaces.
641 :returns: List of created interfaces.
646 intf = VppPGInterface(cls, i)
647 setattr(cls, intf.name, intf)
649 cls.pg_interfaces = result
653 def create_loopback_interfaces(cls, count):
655 Create loopback interfaces.
657 :param count: number of interfaces created.
658 :returns: List of created interfaces.
660 result = [VppLoInterface(cls) for i in range(count)]
662 setattr(cls, intf.name, intf)
663 cls.lo_interfaces = result
667 def extend_packet(packet, size, padding=' '):
669 Extend packet to given size by padding with spaces or custom padding
670 NOTE: Currently works only when Raw layer is present.
672 :param packet: packet
673 :param size: target size
674 :param padding: padding used to extend the payload
677 packet_len = len(packet) + 4
678 extend = size - packet_len
680 num = (extend / len(padding)) + 1
681 packet[Raw].load += (padding * num)[:extend]
684 def reset_packet_infos(cls):
685 """ Reset the list of packet info objects and packet counts to zero """
686 cls._packet_infos = {}
687 cls._packet_count_for_dst_if_idx = {}
690 def create_packet_info(cls, src_if, dst_if):
692 Create packet info object containing the source and destination indexes
693 and add it to the testcase's packet info list
695 :param VppInterface src_if: source interface
696 :param VppInterface dst_if: destination interface
698 :returns: _PacketInfo object
702 info.index = len(cls._packet_infos)
703 info.src = src_if.sw_if_index
704 info.dst = dst_if.sw_if_index
705 if isinstance(dst_if, VppSubInterface):
706 dst_idx = dst_if.parent.sw_if_index
708 dst_idx = dst_if.sw_if_index
709 if dst_idx in cls._packet_count_for_dst_if_idx:
710 cls._packet_count_for_dst_if_idx[dst_idx] += 1
712 cls._packet_count_for_dst_if_idx[dst_idx] = 1
713 cls._packet_infos[info.index] = info
717 def info_to_payload(info):
719 Convert _PacketInfo object to packet payload
721 :param info: _PacketInfo object
723 :returns: string containing serialized data from packet info
725 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
729 def payload_to_info(payload):
731 Convert packet payload to _PacketInfo object
733 :param payload: packet payload
735 :returns: _PacketInfo object containing de-serialized data from payload
738 numbers = payload.split()
740 info.index = int(numbers[0])
741 info.src = int(numbers[1])
742 info.dst = int(numbers[2])
743 info.ip = int(numbers[3])
744 info.proto = int(numbers[4])
747 def get_next_packet_info(self, info):
749 Iterate over the packet info list stored in the testcase
750 Start iteration with first element if info is None
751 Continue based on index in info if info is specified
753 :param info: info or None
754 :returns: next info in list or None if no more infos
759 next_index = info.index + 1
760 if next_index == len(self._packet_infos):
763 return self._packet_infos[next_index]
765 def get_next_packet_info_for_interface(self, src_index, info):
767 Search the packet info list for the next packet info with same source
770 :param src_index: source interface index to search for
771 :param info: packet info - where to start the search
772 :returns: packet info or None
776 info = self.get_next_packet_info(info)
779 if info.src == src_index:
782 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
784 Search the packet info list for the next packet info with same source
785 and destination interface indexes
787 :param src_index: source interface index to search for
788 :param dst_index: destination interface index to search for
789 :param info: packet info - where to start the search
790 :returns: packet info or None
794 info = self.get_next_packet_info_for_interface(src_index, info)
797 if info.dst == dst_index:
800 def assert_equal(self, real_value, expected_value, name_or_class=None):
801 if name_or_class is None:
802 self.assertEqual(real_value, expected_value)
805 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
806 msg = msg % (getdoc(name_or_class).strip(),
807 real_value, str(name_or_class(real_value)),
808 expected_value, str(name_or_class(expected_value)))
810 msg = "Invalid %s: %s does not match expected value %s" % (
811 name_or_class, real_value, expected_value)
813 self.assertEqual(real_value, expected_value, msg)
815 def assert_in_range(self,
823 msg = "Invalid %s: %s out of range <%s,%s>" % (
824 name, real_value, expected_min, expected_max)
825 self.assertTrue(expected_min <= real_value <= expected_max, msg)
827 def assert_packet_checksums_valid(self, packet,
828 ignore_zero_udp_checksums=True):
829 received = packet.__class__(str(packet))
831 ppp("Verifying packet checksums for packet:", received))
832 udp_layers = ['UDP', 'UDPerror']
833 checksum_fields = ['cksum', 'chksum']
836 temp = received.__class__(str(received))
838 layer = temp.getlayer(counter)
840 for cf in checksum_fields:
841 if hasattr(layer, cf):
842 if ignore_zero_udp_checksums and \
843 0 == getattr(layer, cf) and \
844 layer.name in udp_layers:
847 checksums.append((counter, cf))
850 counter = counter + 1
851 if 0 == len(checksums):
853 temp = temp.__class__(str(temp))
854 for layer, cf in checksums:
855 calc_sum = getattr(temp[layer], cf)
857 getattr(received[layer], cf), calc_sum,
858 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
860 "Checksum field `%s` on `%s` layer has correct value `%s`" %
861 (cf, temp[layer].name, calc_sum))
863 def assert_checksum_valid(self, received_packet, layer,
865 ignore_zero_checksum=False):
866 """ Check checksum of received packet on given layer """
867 received_packet_checksum = getattr(received_packet[layer], field_name)
868 if ignore_zero_checksum and 0 == received_packet_checksum:
870 recalculated = received_packet.__class__(str(received_packet))
871 delattr(recalculated[layer], field_name)
872 recalculated = recalculated.__class__(str(recalculated))
873 self.assert_equal(received_packet_checksum,
874 getattr(recalculated[layer], field_name),
875 "packet checksum on layer: %s" % layer)
877 def assert_ip_checksum_valid(self, received_packet,
878 ignore_zero_checksum=False):
879 self.assert_checksum_valid(received_packet, 'IP',
880 ignore_zero_checksum=ignore_zero_checksum)
882 def assert_tcp_checksum_valid(self, received_packet,
883 ignore_zero_checksum=False):
884 self.assert_checksum_valid(received_packet, 'TCP',
885 ignore_zero_checksum=ignore_zero_checksum)
887 def assert_udp_checksum_valid(self, received_packet,
888 ignore_zero_checksum=True):
889 self.assert_checksum_valid(received_packet, 'UDP',
890 ignore_zero_checksum=ignore_zero_checksum)
892 def assert_embedded_icmp_checksum_valid(self, received_packet):
893 if received_packet.haslayer(IPerror):
894 self.assert_checksum_valid(received_packet, 'IPerror')
895 if received_packet.haslayer(TCPerror):
896 self.assert_checksum_valid(received_packet, 'TCPerror')
897 if received_packet.haslayer(UDPerror):
898 self.assert_checksum_valid(received_packet, 'UDPerror',
899 ignore_zero_checksum=True)
900 if received_packet.haslayer(ICMPerror):
901 self.assert_checksum_valid(received_packet, 'ICMPerror')
903 def assert_icmp_checksum_valid(self, received_packet):
904 self.assert_checksum_valid(received_packet, 'ICMP')
905 self.assert_embedded_icmp_checksum_valid(received_packet)
907 def assert_icmpv6_checksum_valid(self, pkt):
908 if pkt.haslayer(ICMPv6DestUnreach):
909 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
910 self.assert_embedded_icmp_checksum_valid(pkt)
911 if pkt.haslayer(ICMPv6EchoRequest):
912 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
913 if pkt.haslayer(ICMPv6EchoReply):
914 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
916 def assert_packet_counter_equal(self, counter, expected_value):
917 counters = self.vapi.cli("sh errors").split('\n')
919 for i in range(1, len(counters)-1):
920 results = counters[i].split()
921 if results[1] == counter:
922 counter_value = int(results[0])
924 self.assert_equal(counter_value, expected_value,
925 "packet counter `%s'" % counter)
928 def sleep(cls, timeout, remark=None):
929 if hasattr(cls, 'logger'):
930 cls.logger.debug("Starting sleep for %ss (%s)" % (timeout, remark))
934 if after - before > 2 * timeout:
935 cls.logger.error("unexpected time.sleep() result - "
936 "slept for %ss instead of ~%ss!" % (
937 after - before, timeout))
938 if hasattr(cls, 'logger'):
940 "Finished sleep (%s) - slept %ss (wanted %ss)" % (
941 remark, after - before, timeout))
943 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
944 self.vapi.cli("clear trace")
945 intf.add_stream(pkts)
946 self.pg_enable_capture(self.pg_interfaces)
950 for i in self.pg_interfaces:
951 i.get_capture(0, timeout=timeout)
952 i.assert_nothing_captured(remark=remark)
955 def send_and_expect(self, input, pkts, output):
956 self.vapi.cli("clear trace")
957 input.add_stream(pkts)
958 self.pg_enable_capture(self.pg_interfaces)
960 if isinstance(object, (list,)):
963 rx.append(output.get_capture(len(pkts)))
965 rx = output.get_capture(len(pkts))
968 def send_and_expect_only(self, input, pkts, output, timeout=None):
969 self.vapi.cli("clear trace")
970 input.add_stream(pkts)
971 self.pg_enable_capture(self.pg_interfaces)
973 if isinstance(object, (list,)):
977 rx.append(output.get_capture(len(pkts)))
979 rx = output.get_capture(len(pkts))
983 for i in self.pg_interfaces:
985 i.get_capture(0, timeout=timeout)
986 i.assert_nothing_captured()
992 def get_testcase_doc_name(test):
993 return getdoc(test.__class__).splitlines()[0]
996 def get_test_description(descriptions, test):
997 short_description = test.shortDescription()
998 if descriptions and short_description:
999 return short_description
1004 class TestCaseInfo(object):
1005 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1006 self.logger = logger
1007 self.tempdir = tempdir
1008 self.vpp_pid = vpp_pid
1009 self.vpp_bin_path = vpp_bin_path
1010 self.core_crash_test = None
1013 class VppTestResult(unittest.TestResult):
1015 @property result_string
1016 String variable to store the test case result string.
1018 List variable containing 2-tuples of TestCase instances and strings
1019 holding formatted tracebacks. Each tuple represents a test which
1020 raised an unexpected exception.
1022 List variable containing 2-tuples of TestCase instances and strings
1023 holding formatted tracebacks. Each tuple represents a test where
1024 a failure was explicitly signalled using the TestCase.assert*()
1028 failed_test_cases_info = set()
1029 core_crash_test_cases_info = set()
1030 current_test_case_info = None
1032 def __init__(self, stream, descriptions, verbosity, runner):
1034 :param stream File descriptor to store where to report test results.
1035 Set to the standard error stream by default.
1036 :param descriptions Boolean variable to store information if to use
1037 test case descriptions.
1038 :param verbosity Integer variable to store required verbosity level.
1040 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
1041 self.stream = stream
1042 self.descriptions = descriptions
1043 self.verbosity = verbosity
1044 self.result_string = None
1045 self.runner = runner
1047 def addSuccess(self, test):
1049 Record a test succeeded result
1054 if self.current_test_case_info:
1055 self.current_test_case_info.logger.debug(
1056 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1057 test._testMethodName,
1058 test._testMethodDoc))
1059 unittest.TestResult.addSuccess(self, test)
1060 self.result_string = colorize("OK", GREEN)
1062 self.send_result_through_pipe(test, PASS)
1064 def addSkip(self, test, reason):
1066 Record a test skipped.
1072 if self.current_test_case_info:
1073 self.current_test_case_info.logger.debug(
1074 "--- addSkip() %s.%s(%s) called, reason is %s" %
1075 (test.__class__.__name__, test._testMethodName,
1076 test._testMethodDoc, reason))
1077 unittest.TestResult.addSkip(self, test, reason)
1078 self.result_string = colorize("SKIP", YELLOW)
1080 self.send_result_through_pipe(test, SKIP)
1082 def symlink_failed(self):
1083 if self.current_test_case_info:
1085 failed_dir = os.getenv('FAILED_DIR')
1086 link_path = os.path.join(
1089 os.path.basename(self.current_test_case_info.tempdir))
1090 if self.current_test_case_info.logger:
1091 self.current_test_case_info.logger.debug(
1092 "creating a link to the failed test")
1093 self.current_test_case_info.logger.debug(
1094 "os.symlink(%s, %s)" %
1095 (self.current_test_case_info.tempdir, link_path))
1096 if os.path.exists(link_path):
1097 if self.current_test_case_info.logger:
1098 self.current_test_case_info.logger.debug(
1099 'symlink already exists')
1101 os.symlink(self.current_test_case_info.tempdir, link_path)
1103 except Exception as e:
1104 if self.current_test_case_info.logger:
1105 self.current_test_case_info.logger.error(e)
1107 def send_result_through_pipe(self, test, result):
1108 if hasattr(self, 'test_framework_result_pipe'):
1109 pipe = self.test_framework_result_pipe
1111 pipe.send((test.id(), result))
1113 def log_error(self, test, err, fn_name):
1114 if self.current_test_case_info:
1115 if isinstance(test, unittest.suite._ErrorHolder):
1116 test_name = test.description
1118 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1119 test._testMethodName,
1120 test._testMethodDoc)
1121 self.current_test_case_info.logger.debug(
1122 "--- %s() %s called, err is %s" %
1123 (fn_name, test_name, err))
1124 self.current_test_case_info.logger.debug(
1125 "formatted exception is:\n%s" %
1126 "".join(format_exception(*err)))
1128 def add_error(self, test, err, unittest_fn, error_type):
1129 if error_type == FAIL:
1130 self.log_error(test, err, 'addFailure')
1131 error_type_str = colorize("FAIL", RED)
1132 elif error_type == ERROR:
1133 self.log_error(test, err, 'addError')
1134 error_type_str = colorize("ERROR", RED)
1136 raise Exception('Error type %s cannot be used to record an '
1137 'error or a failure' % error_type)
1139 unittest_fn(self, test, err)
1140 if self.current_test_case_info:
1141 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1143 self.current_test_case_info.tempdir)
1144 self.symlink_failed()
1145 self.failed_test_cases_info.add(self.current_test_case_info)
1146 if is_core_present(self.current_test_case_info.tempdir):
1147 if not self.current_test_case_info.core_crash_test:
1148 if isinstance(test, unittest.suite._ErrorHolder):
1149 test_name = str(test)
1151 test_name = "'{}' ({})".format(
1152 get_testcase_doc_name(test), test.id())
1153 self.current_test_case_info.core_crash_test = test_name
1154 self.core_crash_test_cases_info.add(
1155 self.current_test_case_info)
1157 self.result_string = '%s [no temp dir]' % error_type_str
1159 self.send_result_through_pipe(test, error_type)
1161 def addFailure(self, test, err):
1163 Record a test failed result
1166 :param err: error message
1169 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1171 def addError(self, test, err):
1173 Record a test error result
1176 :param err: error message
1179 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1181 def getDescription(self, test):
1183 Get test description
1186 :returns: test description
1189 return get_test_description(self.descriptions, test)
1191 def startTest(self, test):
1200 unittest.TestResult.startTest(self, test)
1201 if self.verbosity > 0:
1202 self.stream.writeln(
1203 "Starting " + self.getDescription(test) + " ...")
1204 self.stream.writeln(single_line_delim)
1206 def stopTest(self, test):
1208 Called when the given test has been run
1213 unittest.TestResult.stopTest(self, test)
1214 if self.verbosity > 0:
1215 self.stream.writeln(single_line_delim)
1216 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1217 self.result_string))
1218 self.stream.writeln(single_line_delim)
1220 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1221 self.result_string))
1223 self.send_result_through_pipe(test, TEST_RUN)
1225 def printErrors(self):
1227 Print errors from running the test case
1229 if len(self.errors) > 0 or len(self.failures) > 0:
1230 self.stream.writeln()
1231 self.printErrorList('ERROR', self.errors)
1232 self.printErrorList('FAIL', self.failures)
1234 # ^^ that is the last output from unittest before summary
1235 if not self.runner.print_summary:
1236 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1237 self.stream = devnull
1238 self.runner.stream = devnull
1240 def printErrorList(self, flavour, errors):
1242 Print error list to the output stream together with error type
1243 and test case description.
1245 :param flavour: error type
1246 :param errors: iterable errors
1249 for test, err in errors:
1250 self.stream.writeln(double_line_delim)
1251 self.stream.writeln("%s: %s" %
1252 (flavour, self.getDescription(test)))
1253 self.stream.writeln(single_line_delim)
1254 self.stream.writeln("%s" % err)
1257 class VppTestRunner(unittest.TextTestRunner):
1259 A basic test runner implementation which prints results to standard error.
1262 def resultclass(self):
1263 """Class maintaining the results of the tests"""
1264 return VppTestResult
1266 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1267 result_pipe=None, failfast=False, buffer=False,
1268 resultclass=None, print_summary=True):
1270 # ignore stream setting here, use hard-coded stdout to be in sync
1271 # with prints from VppTestCase methods ...
1272 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1273 verbosity, failfast, buffer,
1275 KeepAliveReporter.pipe = keep_alive_pipe
1277 self.orig_stream = self.stream
1278 self.resultclass.test_framework_result_pipe = result_pipe
1280 self.print_summary = print_summary
1282 def _makeResult(self):
1283 return self.resultclass(self.stream,
1288 def run(self, test):
1295 faulthandler.enable() # emit stack trace to stderr if killed by signal
1297 result = super(VppTestRunner, self).run(test)
1298 if not self.print_summary:
1299 self.stream = self.orig_stream
1300 result.stream = self.orig_stream
1304 class Worker(Thread):
1305 def __init__(self, args, logger, env={}):
1306 self.logger = logger
1309 self.env = copy.deepcopy(env)
1310 super(Worker, self).__init__()
1313 executable = self.args[0]
1314 self.logger.debug("Running executable w/args `%s'" % self.args)
1315 env = os.environ.copy()
1316 env.update(self.env)
1317 env["CK_LOG_FILE_NAME"] = "-"
1318 self.process = subprocess.Popen(
1319 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1320 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1321 out, err = self.process.communicate()
1322 self.logger.debug("Finished running `%s'" % executable)
1323 self.logger.info("Return code is `%s'" % self.process.returncode)
1324 self.logger.info(single_line_delim)
1325 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1326 self.logger.info(single_line_delim)
1327 self.logger.info(out)
1328 self.logger.info(single_line_delim)
1329 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1330 self.logger.info(single_line_delim)
1331 self.logger.info(err)
1332 self.logger.info(single_line_delim)
1333 self.result = self.process.returncode