3 from __future__ import print_function
16 from collections import deque
17 from threading import Thread, Event
18 from inspect import getdoc, isclass
19 from traceback import format_exception
20 from logging import FileHandler, DEBUG, Formatter
23 from scapy.packet import Raw
24 from hook import StepHook, PollHook, VppDiedError
25 from vpp_pg_interface import VppPGInterface
26 from vpp_sub_interface import VppSubInterface
27 from vpp_lo_interface import VppLoInterface
28 from vpp_bvi_interface import VppBviInterface
29 from vpp_papi_provider import VppPapiProvider
30 from vpp_papi.vpp_stats import VPPStats
31 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
33 from vpp_object import VppObjectRegistry
34 from util import ppp, is_core_present
35 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
36 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
37 from scapy.layers.inet6 import ICMPv6EchoReply
39 if os.name == 'posix' and sys.version_info[0] < 3:
40 # using subprocess32 is recommended by python official documentation
41 # @ https://docs.python.org/2/library/subprocess.html
42 import subprocess32 as subprocess
46 # Python2/3 compatible
58 debug_framework = False
59 if os.getenv('TEST_DEBUG', "0") == "1":
60 debug_framework = True
64 Test framework module.
66 The module provides a set of tools for constructing and running tests and
67 representing the results.
71 class _PacketInfo(object):
72 """Private class to create packet info object.
74 Help process information about the next packet.
75 Set variables to default values.
77 #: Store the index of the packet.
79 #: Store the index of the source packet generator interface of the packet.
81 #: Store the index of the destination packet generator interface
84 #: Store expected ip version
86 #: Store expected upper protocol
88 #: Store the copy of the former packet.
91 def __eq__(self, other):
92 index = self.index == other.index
93 src = self.src == other.src
94 dst = self.dst == other.dst
95 data = self.data == other.data
96 return index and src and dst and data
99 def pump_output(testclass):
100 """ pump output from vpp stdout/stderr to proper queues """
103 while not testclass.pump_thread_stop_flag.is_set():
104 readable = select.select([testclass.vpp.stdout.fileno(),
105 testclass.vpp.stderr.fileno(),
106 testclass.pump_thread_wakeup_pipe[0]],
108 if testclass.vpp.stdout.fileno() in readable:
109 read = os.read(testclass.vpp.stdout.fileno(), 102400)
111 split = read.splitlines(True)
112 if len(stdout_fragment) > 0:
113 split[0] = "%s%s" % (stdout_fragment, split[0])
114 if len(split) > 0 and split[-1].endswith("\n"):
118 stdout_fragment = split[-1]
119 testclass.vpp_stdout_deque.extend(split[:limit])
120 if not testclass.cache_vpp_output:
121 for line in split[:limit]:
122 testclass.logger.debug(
123 "VPP STDOUT: %s" % line.rstrip("\n"))
124 if testclass.vpp.stderr.fileno() in readable:
125 read = os.read(testclass.vpp.stderr.fileno(), 102400)
127 split = read.splitlines(True)
128 if len(stderr_fragment) > 0:
129 split[0] = "%s%s" % (stderr_fragment, split[0])
130 if len(split) > 0 and split[-1].endswith(b"\n"):
134 stderr_fragment = split[-1]
135 testclass.vpp_stderr_deque.extend(split[:limit])
136 if not testclass.cache_vpp_output:
137 for line in split[:limit]:
138 testclass.logger.debug(
139 "VPP STDERR: %s" % line.rstrip("\n"))
140 # ignoring the dummy pipe here intentionally - the
141 # flag will take care of properly terminating the loop
144 def _is_skip_aarch64_set():
145 return os.getenv('SKIP_AARCH64', 'n').lower() in ('yes', 'y', '1')
147 is_skip_aarch64_set = _is_skip_aarch64_set()
150 def _is_platform_aarch64():
151 return platform.machine() == 'aarch64'
153 is_platform_aarch64 = _is_platform_aarch64()
156 def _running_extended_tests():
157 s = os.getenv("EXTENDED_TESTS", "n")
158 return True if s.lower() in ("y", "yes", "1") else False
160 running_extended_tests = _running_extended_tests()
163 def _running_on_centos():
164 os_id = os.getenv("OS_ID", "")
165 return True if "centos" in os_id.lower() else False
167 running_on_centos = _running_on_centos
170 class KeepAliveReporter(object):
172 Singleton object which reports test start to parent process
177 self.__dict__ = self._shared_state
185 def pipe(self, pipe):
186 if self._pipe is not None:
187 raise Exception("Internal error - pipe should only be set once.")
190 def send_keep_alive(self, test, desc=None):
192 Write current test tmpdir & desc to keep-alive pipe to signal liveness
194 if self.pipe is None:
195 # if not running forked..
199 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
203 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
206 class VppTestCase(unittest.TestCase):
207 """This subclass is a base class for VPP test cases that are implemented as
208 classes. It provides methods to create and run test case.
211 extra_vpp_punt_config = []
212 extra_vpp_plugin_config = []
215 def packet_infos(self):
216 """List of packet infos"""
217 return self._packet_infos
220 def get_packet_count_for_if_idx(cls, dst_if_index):
221 """Get the number of packet info for specified destination if index"""
222 if dst_if_index in cls._packet_count_for_dst_if_idx:
223 return cls._packet_count_for_dst_if_idx[dst_if_index]
229 """Return the instance of this testcase"""
230 return cls.test_instance
233 def set_debug_flags(cls, d):
234 cls.debug_core = False
235 cls.debug_gdb = False
236 cls.debug_gdbserver = False
241 cls.debug_core = True
244 elif dl == "gdbserver":
245 cls.debug_gdbserver = True
247 raise Exception("Unrecognized DEBUG option: '%s'" % d)
250 def get_least_used_cpu():
251 cpu_usage_list = [set(range(psutil.cpu_count()))]
252 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
253 if 'vpp_main' == p.info['name']]
254 for vpp_process in vpp_processes:
255 for cpu_usage_set in cpu_usage_list:
257 cpu_num = vpp_process.cpu_num()
258 if cpu_num in cpu_usage_set:
259 cpu_usage_set_index = cpu_usage_list.index(
261 if cpu_usage_set_index == len(cpu_usage_list) - 1:
262 cpu_usage_list.append({cpu_num})
264 cpu_usage_list[cpu_usage_set_index + 1].add(
266 cpu_usage_set.remove(cpu_num)
268 except psutil.NoSuchProcess:
271 for cpu_usage_set in cpu_usage_list:
272 if len(cpu_usage_set) > 0:
273 min_usage_set = cpu_usage_set
276 return random.choice(tuple(min_usage_set))
279 def setUpConstants(cls):
280 """ Set-up the test case class based on environment variables """
281 s = os.getenv("STEP", "n")
282 cls.step = True if s.lower() in ("y", "yes", "1") else False
283 d = os.getenv("DEBUG", None)
284 c = os.getenv("CACHE_OUTPUT", "1")
285 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
286 cls.set_debug_flags(d)
287 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
288 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
289 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
291 if cls.plugin_path is not None:
292 if cls.extern_plugin_path is not None:
293 plugin_path = "%s:%s" % (
294 cls.plugin_path, cls.extern_plugin_path)
296 plugin_path = cls.plugin_path
297 elif cls.extern_plugin_path is not None:
298 plugin_path = cls.extern_plugin_path
300 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
301 debug_cli = "cli-listen localhost:5002"
303 size = os.getenv("COREDUMP_SIZE")
305 coredump_size = "coredump-size %s" % size
306 if coredump_size is None:
307 coredump_size = "coredump-size unlimited"
309 cpu_core_number = cls.get_least_used_cpu()
311 cls.vpp_cmdline = [cls.vpp_bin, "unix",
312 "{", "nodaemon", debug_cli, "full-coredump",
313 coredump_size, "runtime-dir", cls.tempdir, "}",
314 "api-trace", "{", "on", "}", "api-segment", "{",
315 "prefix", cls.shm_prefix, "}", "cpu", "{",
316 "main-core", str(cpu_core_number), "}",
317 "statseg", "{", "socket-name", cls.stats_sock, "}",
318 "socksvr", "{", "socket-name", cls.api_sock, "}",
320 "{", "plugin", "dpdk_plugin.so", "{", "disable",
321 "}", "plugin", "rdma_plugin.so", "{", "disable",
322 "}", "plugin", "unittest_plugin.so", "{", "enable",
323 "}"] + cls.extra_vpp_plugin_config + ["}", ]
324 if cls.extra_vpp_punt_config is not None:
325 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
326 if plugin_path is not None:
327 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
328 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
329 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
332 def wait_for_enter(cls):
333 if cls.debug_gdbserver:
334 print(double_line_delim)
335 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
337 print(double_line_delim)
338 print("Spawned VPP with PID: %d" % cls.vpp.pid)
340 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
342 print(single_line_delim)
343 print("You can debug the VPP using e.g.:")
344 if cls.debug_gdbserver:
345 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
346 print("Now is the time to attach a gdb by running the above "
347 "command, set up breakpoints etc. and then resume VPP from "
348 "within gdb by issuing the 'continue' command")
350 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
351 print("Now is the time to attach a gdb by running the above "
352 "command and set up breakpoints etc.")
353 print(single_line_delim)
354 input("Press ENTER to continue running the testcase...")
358 cmdline = cls.vpp_cmdline
360 if cls.debug_gdbserver:
361 gdbserver = '/usr/bin/gdbserver'
362 if not os.path.isfile(gdbserver) or \
363 not os.access(gdbserver, os.X_OK):
364 raise Exception("gdbserver binary '%s' does not exist or is "
365 "not executable" % gdbserver)
367 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
368 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
371 cls.vpp = subprocess.Popen(cmdline,
372 stdout=subprocess.PIPE,
373 stderr=subprocess.PIPE,
375 except subprocess.CalledProcessError as e:
376 cls.logger.critical("Subprocess returned with non-0 return code: ("
380 cls.logger.critical("Subprocess returned with OS error: "
381 "(%s) %s", e.errno, e.strerror)
383 except Exception as e:
384 cls.logger.exception("Subprocess returned unexpected from "
391 def wait_for_stats_socket(cls):
392 deadline = time.time() + 3
394 while time.time() < deadline or \
395 cls.debug_gdb or cls.debug_gdbserver:
396 if os.path.exists(cls.stats_sock):
401 cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
404 def wait_for_coredump(cls):
405 corefile = cls.tempdir + "/core"
406 if os.path.isfile(corefile):
407 cls.logger.error("Waiting for coredump to complete: %s", corefile)
408 curr_size = os.path.getsize(corefile)
409 deadline = time.time() + 60
411 while time.time() < deadline:
414 curr_size = os.path.getsize(corefile)
415 if size == curr_size:
419 cls.logger.error("Timed out waiting for coredump to complete:"
422 cls.logger.error("Coredump complete: %s, size %d",
428 Perform class setup before running the testcase
429 Remove shared memory files, start vpp and connect the vpp-api
431 super(VppTestCase, cls).setUpClass()
432 gc.collect() # run garbage collection first
434 cls.logger = get_logger(cls.__name__)
435 if hasattr(cls, 'parallel_handler'):
436 cls.logger.addHandler(cls.parallel_handler)
437 cls.logger.propagate = False
439 cls.tempdir = tempfile.mkdtemp(
440 prefix='vpp-unittest-%s-' % cls.__name__)
441 cls.stats_sock = "%s/stats.sock" % cls.tempdir
442 cls.api_sock = "%s/api.sock" % cls.tempdir
443 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
444 cls.file_handler.setFormatter(
445 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
447 cls.file_handler.setLevel(DEBUG)
448 cls.logger.addHandler(cls.file_handler)
449 cls.logger.debug("--- setUpClass() for %s called ---" %
451 cls.shm_prefix = os.path.basename(cls.tempdir)
452 os.chdir(cls.tempdir)
453 cls.logger.info("Temporary dir is %s, shm prefix is %s",
454 cls.tempdir, cls.shm_prefix)
456 cls.reset_packet_infos()
458 cls._zombie_captures = []
461 cls.registry = VppObjectRegistry()
462 cls.vpp_startup_failed = False
463 cls.reporter = KeepAliveReporter()
464 # need to catch exceptions here because if we raise, then the cleanup
465 # doesn't get called and we might end with a zombie vpp
468 cls.reporter.send_keep_alive(cls, 'setUpClass')
469 VppTestResult.current_test_case_info = TestCaseInfo(
470 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
471 cls.vpp_stdout_deque = deque()
472 cls.vpp_stderr_deque = deque()
473 cls.pump_thread_stop_flag = Event()
474 cls.pump_thread_wakeup_pipe = os.pipe()
475 cls.pump_thread = Thread(target=pump_output, args=(cls,))
476 cls.pump_thread.daemon = True
477 cls.pump_thread.start()
478 if cls.debug_gdb or cls.debug_gdbserver:
482 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
488 cls.vapi.register_hook(hook)
489 cls.wait_for_stats_socket()
490 cls.statistics = VPPStats(socketname=cls.stats_sock)
494 cls.vpp_startup_failed = True
496 "VPP died shortly after startup, check the"
497 " output to standard error for possible cause")
503 cls.vapi.disconnect()
506 if cls.debug_gdbserver:
507 print(colorize("You're running VPP inside gdbserver but "
508 "VPP-API connection failed, did you forget "
509 "to 'continue' VPP from within gdb?", RED))
521 Disconnect vpp-api, kill vpp and cleanup shared memory files
523 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
525 if cls.vpp.returncode is None:
526 print(double_line_delim)
527 print("VPP or GDB server is still running")
528 print(single_line_delim)
529 input("When done debugging, press ENTER to kill the "
530 "process and finish running the testcase...")
532 # first signal that we want to stop the pump thread, then wake it up
533 if hasattr(cls, 'pump_thread_stop_flag'):
534 cls.pump_thread_stop_flag.set()
535 if hasattr(cls, 'pump_thread_wakeup_pipe'):
536 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
537 if hasattr(cls, 'pump_thread'):
538 cls.logger.debug("Waiting for pump thread to stop")
539 cls.pump_thread.join()
540 if hasattr(cls, 'vpp_stderr_reader_thread'):
541 cls.logger.debug("Waiting for stdderr pump to stop")
542 cls.vpp_stderr_reader_thread.join()
544 if hasattr(cls, 'vpp'):
545 if hasattr(cls, 'vapi'):
546 cls.logger.debug("Disconnecting class vapi client on %s",
548 cls.vapi.disconnect()
549 cls.logger.debug("Deleting class vapi attribute on %s",
553 if cls.vpp.returncode is None:
554 cls.wait_for_coredump()
555 cls.logger.debug("Sending TERM to vpp")
557 cls.logger.debug("Waiting for vpp to die")
558 cls.vpp.communicate()
559 cls.logger.debug("Deleting class vpp attribute on %s",
563 if cls.vpp_startup_failed:
564 stdout_log = cls.logger.info
565 stderr_log = cls.logger.critical
567 stdout_log = cls.logger.info
568 stderr_log = cls.logger.info
570 if hasattr(cls, 'vpp_stdout_deque'):
571 stdout_log(single_line_delim)
572 stdout_log('VPP output to stdout while running %s:', cls.__name__)
573 stdout_log(single_line_delim)
574 vpp_output = "".join(cls.vpp_stdout_deque)
575 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
577 stdout_log('\n%s', vpp_output)
578 stdout_log(single_line_delim)
580 if hasattr(cls, 'vpp_stderr_deque'):
581 stderr_log(single_line_delim)
582 stderr_log('VPP output to stderr while running %s:', cls.__name__)
583 stderr_log(single_line_delim)
584 vpp_output = "".join(cls.vpp_stderr_deque)
585 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
587 stderr_log('\n%s', vpp_output)
588 stderr_log(single_line_delim)
591 def tearDownClass(cls):
592 """ Perform final cleanup after running all tests in this test-case """
593 cls.logger.debug("--- tearDownClass() for %s called ---" %
595 cls.reporter.send_keep_alive(cls, 'tearDownClass')
597 cls.file_handler.close()
598 cls.reset_packet_infos()
600 debug_internal.on_tear_down_class(cls)
602 def show_commands_at_teardown(self):
603 """ Allow subclass specific teardown logging additions."""
604 self.logger.info("--- No test specific show commands provided. ---")
607 """ Show various debug prints after each test """
608 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
609 (self.__class__.__name__, self._testMethodName,
610 self._testMethodDoc))
611 if not self.vpp_dead:
613 "--- Logging show commands common to all testcases. ---")
614 self.logger.debug(self.vapi.cli("show trace max 1000"))
615 self.logger.info(self.vapi.ppcli("show interface"))
616 self.logger.info(self.vapi.ppcli("show hardware"))
617 self.logger.info(self.statistics.set_errors_str())
618 self.logger.info(self.vapi.ppcli("show run"))
619 self.logger.info(self.vapi.ppcli("show log"))
620 self.logger.info("Logging testcase specific show commands.")
621 self.show_commands_at_teardown()
622 self.registry.remove_vpp_config(self.logger)
623 # Save/Dump VPP api trace log
624 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
625 tmp_api_trace = "/tmp/%s" % api_trace
626 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
627 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
628 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
630 os.rename(tmp_api_trace, vpp_api_trace_log)
631 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
634 self.registry.unregister_all(self.logger)
637 """ Clear trace before running each test"""
638 super(VppTestCase, self).setUp()
639 self.reporter.send_keep_alive(self)
641 raise Exception("VPP is dead when setting up the test")
642 self.sleep(.1, "during setUp")
643 self.vpp_stdout_deque.append(
644 "--- test setUp() for %s.%s(%s) starts here ---\n" %
645 (self.__class__.__name__, self._testMethodName,
646 self._testMethodDoc))
647 self.vpp_stderr_deque.append(
648 "--- test setUp() for %s.%s(%s) starts here ---\n" %
649 (self.__class__.__name__, self._testMethodName,
650 self._testMethodDoc))
651 self.vapi.cli("clear trace")
652 # store the test instance inside the test class - so that objects
653 # holding the class can access instance methods (like assertEqual)
654 type(self).test_instance = self
657 def pg_enable_capture(cls, interfaces=None):
659 Enable capture on packet-generator interfaces
661 :param interfaces: iterable interface indexes (if None,
662 use self.pg_interfaces)
665 if interfaces is None:
666 interfaces = cls.pg_interfaces
671 def register_capture(cls, cap_name):
672 """ Register a capture in the testclass """
673 # add to the list of captures with current timestamp
674 cls._captures.append((time.time(), cap_name))
675 # filter out from zombies
676 cls._zombie_captures = [(stamp, name)
677 for (stamp, name) in cls._zombie_captures
682 """ Remove any zombie captures and enable the packet generator """
683 # how long before capture is allowed to be deleted - otherwise vpp
684 # crashes - 100ms seems enough (this shouldn't be needed at all)
687 for stamp, cap_name in cls._zombie_captures:
688 wait = stamp + capture_ttl - now
690 cls.sleep(wait, "before deleting capture %s" % cap_name)
692 cls.logger.debug("Removing zombie capture %s" % cap_name)
693 cls.vapi.cli('packet-generator delete %s' % cap_name)
695 cls.vapi.cli("trace add pg-input 1000")
696 cls.vapi.cli('packet-generator enable')
697 cls._zombie_captures = cls._captures
701 def create_pg_interfaces(cls, interfaces):
703 Create packet-generator interfaces.
705 :param interfaces: iterable indexes of the interfaces.
706 :returns: List of created interfaces.
711 intf = VppPGInterface(cls, i)
712 setattr(cls, intf.name, intf)
714 cls.pg_interfaces = result
718 def create_loopback_interfaces(cls, count):
720 Create loopback interfaces.
722 :param count: number of interfaces created.
723 :returns: List of created interfaces.
725 result = [VppLoInterface(cls) for i in range(count)]
727 setattr(cls, intf.name, intf)
728 cls.lo_interfaces = result
732 def create_bvi_interfaces(cls, count):
734 Create BVI interfaces.
736 :param count: number of interfaces created.
737 :returns: List of created interfaces.
739 result = [VppBviInterface(cls) for i in range(count)]
741 setattr(cls, intf.name, intf)
742 cls.bvi_interfaces = result
746 def extend_packet(packet, size, padding=' '):
748 Extend packet to given size by padding with spaces or custom padding
749 NOTE: Currently works only when Raw layer is present.
751 :param packet: packet
752 :param size: target size
753 :param padding: padding used to extend the payload
756 packet_len = len(packet) + 4
757 extend = size - packet_len
759 num = (extend // len(padding)) + 1
760 packet[Raw].load += (padding * num)[:extend].encode("ascii")
763 def reset_packet_infos(cls):
764 """ Reset the list of packet info objects and packet counts to zero """
765 cls._packet_infos = {}
766 cls._packet_count_for_dst_if_idx = {}
769 def create_packet_info(cls, src_if, dst_if):
771 Create packet info object containing the source and destination indexes
772 and add it to the testcase's packet info list
774 :param VppInterface src_if: source interface
775 :param VppInterface dst_if: destination interface
777 :returns: _PacketInfo object
781 info.index = len(cls._packet_infos)
782 info.src = src_if.sw_if_index
783 info.dst = dst_if.sw_if_index
784 if isinstance(dst_if, VppSubInterface):
785 dst_idx = dst_if.parent.sw_if_index
787 dst_idx = dst_if.sw_if_index
788 if dst_idx in cls._packet_count_for_dst_if_idx:
789 cls._packet_count_for_dst_if_idx[dst_idx] += 1
791 cls._packet_count_for_dst_if_idx[dst_idx] = 1
792 cls._packet_infos[info.index] = info
796 def info_to_payload(info):
798 Convert _PacketInfo object to packet payload
800 :param info: _PacketInfo object
802 :returns: string containing serialized data from packet info
804 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
808 def payload_to_info(payload, payload_field='load'):
810 Convert packet payload to _PacketInfo object
812 :param payload: packet payload
813 :type payload: <class 'scapy.packet.Raw'>
814 :param payload_field: packet fieldname of payload "load" for
815 <class 'scapy.packet.Raw'>
816 :type payload_field: str
817 :returns: _PacketInfo object containing de-serialized data from payload
820 numbers = getattr(payload, payload_field).split()
822 info.index = int(numbers[0])
823 info.src = int(numbers[1])
824 info.dst = int(numbers[2])
825 info.ip = int(numbers[3])
826 info.proto = int(numbers[4])
829 def get_next_packet_info(self, info):
831 Iterate over the packet info list stored in the testcase
832 Start iteration with first element if info is None
833 Continue based on index in info if info is specified
835 :param info: info or None
836 :returns: next info in list or None if no more infos
841 next_index = info.index + 1
842 if next_index == len(self._packet_infos):
845 return self._packet_infos[next_index]
847 def get_next_packet_info_for_interface(self, src_index, info):
849 Search the packet info list for the next packet info with same source
852 :param src_index: source interface index to search for
853 :param info: packet info - where to start the search
854 :returns: packet info or None
858 info = self.get_next_packet_info(info)
861 if info.src == src_index:
864 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
866 Search the packet info list for the next packet info with same source
867 and destination interface indexes
869 :param src_index: source interface index to search for
870 :param dst_index: destination interface index to search for
871 :param info: packet info - where to start the search
872 :returns: packet info or None
876 info = self.get_next_packet_info_for_interface(src_index, info)
879 if info.dst == dst_index:
882 def assert_equal(self, real_value, expected_value, name_or_class=None):
883 if name_or_class is None:
884 self.assertEqual(real_value, expected_value)
887 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
888 msg = msg % (getdoc(name_or_class).strip(),
889 real_value, str(name_or_class(real_value)),
890 expected_value, str(name_or_class(expected_value)))
892 msg = "Invalid %s: %s does not match expected value %s" % (
893 name_or_class, real_value, expected_value)
895 self.assertEqual(real_value, expected_value, msg)
897 def assert_in_range(self,
905 msg = "Invalid %s: %s out of range <%s,%s>" % (
906 name, real_value, expected_min, expected_max)
907 self.assertTrue(expected_min <= real_value <= expected_max, msg)
909 def assert_packet_checksums_valid(self, packet,
910 ignore_zero_udp_checksums=True):
911 received = packet.__class__(scapy.compat.raw(packet))
913 ppp("Verifying packet checksums for packet:", received))
914 udp_layers = ['UDP', 'UDPerror']
915 checksum_fields = ['cksum', 'chksum']
918 temp = received.__class__(scapy.compat.raw(received))
920 layer = temp.getlayer(counter)
922 for cf in checksum_fields:
923 if hasattr(layer, cf):
924 if ignore_zero_udp_checksums and \
925 0 == getattr(layer, cf) and \
926 layer.name in udp_layers:
929 checksums.append((counter, cf))
932 counter = counter + 1
933 if 0 == len(checksums):
935 temp = temp.__class__(scapy.compat.raw(temp))
936 for layer, cf in checksums:
937 calc_sum = getattr(temp[layer], cf)
939 getattr(received[layer], cf), calc_sum,
940 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
942 "Checksum field `%s` on `%s` layer has correct value `%s`" %
943 (cf, temp[layer].name, calc_sum))
945 def assert_checksum_valid(self, received_packet, layer,
947 ignore_zero_checksum=False):
948 """ Check checksum of received packet on given layer """
949 received_packet_checksum = getattr(received_packet[layer], field_name)
950 if ignore_zero_checksum and 0 == received_packet_checksum:
952 recalculated = received_packet.__class__(
953 scapy.compat.raw(received_packet))
954 delattr(recalculated[layer], field_name)
955 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
956 self.assert_equal(received_packet_checksum,
957 getattr(recalculated[layer], field_name),
958 "packet checksum on layer: %s" % layer)
960 def assert_ip_checksum_valid(self, received_packet,
961 ignore_zero_checksum=False):
962 self.assert_checksum_valid(received_packet, 'IP',
963 ignore_zero_checksum=ignore_zero_checksum)
965 def assert_tcp_checksum_valid(self, received_packet,
966 ignore_zero_checksum=False):
967 self.assert_checksum_valid(received_packet, 'TCP',
968 ignore_zero_checksum=ignore_zero_checksum)
970 def assert_udp_checksum_valid(self, received_packet,
971 ignore_zero_checksum=True):
972 self.assert_checksum_valid(received_packet, 'UDP',
973 ignore_zero_checksum=ignore_zero_checksum)
975 def assert_embedded_icmp_checksum_valid(self, received_packet):
976 if received_packet.haslayer(IPerror):
977 self.assert_checksum_valid(received_packet, 'IPerror')
978 if received_packet.haslayer(TCPerror):
979 self.assert_checksum_valid(received_packet, 'TCPerror')
980 if received_packet.haslayer(UDPerror):
981 self.assert_checksum_valid(received_packet, 'UDPerror',
982 ignore_zero_checksum=True)
983 if received_packet.haslayer(ICMPerror):
984 self.assert_checksum_valid(received_packet, 'ICMPerror')
986 def assert_icmp_checksum_valid(self, received_packet):
987 self.assert_checksum_valid(received_packet, 'ICMP')
988 self.assert_embedded_icmp_checksum_valid(received_packet)
990 def assert_icmpv6_checksum_valid(self, pkt):
991 if pkt.haslayer(ICMPv6DestUnreach):
992 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
993 self.assert_embedded_icmp_checksum_valid(pkt)
994 if pkt.haslayer(ICMPv6EchoRequest):
995 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
996 if pkt.haslayer(ICMPv6EchoReply):
997 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
999 def assert_packet_counter_equal(self, counter, expected_value):
1000 if counter.startswith("/"):
1001 counter_value = self.statistics.get_counter(counter)
1002 self.assert_equal(counter_value, expected_value,
1003 "packet counter `%s'" % counter)
1005 counters = self.vapi.cli("sh errors").split('\n')
1007 for i in range(1, len(counters) - 1):
1008 results = counters[i].split()
1009 if results[1] == counter:
1010 counter_value = int(results[0])
1014 def sleep(cls, timeout, remark=None):
1016 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1017 # * by Guido, only the main thread can be interrupted.
1019 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1022 if hasattr(os, 'sched_yield'):
1028 if hasattr(cls, 'logger'):
1029 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1030 before = time.time()
1033 if hasattr(cls, 'logger') and after - before > 2 * timeout:
1034 cls.logger.error("unexpected self.sleep() result - "
1035 "slept for %es instead of ~%es!",
1036 after - before, timeout)
1037 if hasattr(cls, 'logger'):
1039 "Finished sleep (%s) - slept %es (wanted %es)",
1040 remark, after - before, timeout)
1042 def pg_send(self, intf, pkts):
1043 self.vapi.cli("clear trace")
1044 intf.add_stream(pkts)
1045 self.pg_enable_capture(self.pg_interfaces)
1048 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
1049 self.pg_send(intf, pkts)
1052 for i in self.pg_interfaces:
1053 i.get_capture(0, timeout=timeout)
1054 i.assert_nothing_captured(remark=remark)
1057 def send_and_expect(self, intf, pkts, output, n_rx=None):
1060 self.pg_send(intf, pkts)
1061 rx = output.get_capture(n_rx)
1064 def send_and_expect_only(self, intf, pkts, output, timeout=None):
1065 self.pg_send(intf, pkts)
1066 rx = output.get_capture(len(pkts))
1070 for i in self.pg_interfaces:
1071 if i not in outputs:
1072 i.get_capture(0, timeout=timeout)
1073 i.assert_nothing_captured()
1079 """ unittest calls runTest when TestCase is instantiated without a
1080 test case. Use case: Writing unittests against VppTestCase"""
1084 def get_testcase_doc_name(test):
1085 return getdoc(test.__class__).splitlines()[0]
1088 def get_test_description(descriptions, test):
1089 short_description = test.shortDescription()
1090 if descriptions and short_description:
1091 return short_description
1096 class TestCaseInfo(object):
1097 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1098 self.logger = logger
1099 self.tempdir = tempdir
1100 self.vpp_pid = vpp_pid
1101 self.vpp_bin_path = vpp_bin_path
1102 self.core_crash_test = None
1105 class VppTestResult(unittest.TestResult):
1107 @property result_string
1108 String variable to store the test case result string.
1110 List variable containing 2-tuples of TestCase instances and strings
1111 holding formatted tracebacks. Each tuple represents a test which
1112 raised an unexpected exception.
1114 List variable containing 2-tuples of TestCase instances and strings
1115 holding formatted tracebacks. Each tuple represents a test where
1116 a failure was explicitly signalled using the TestCase.assert*()
1120 failed_test_cases_info = set()
1121 core_crash_test_cases_info = set()
1122 current_test_case_info = None
1124 def __init__(self, stream=None, descriptions=None, verbosity=None,
1127 :param stream File descriptor to store where to report test results.
1128 Set to the standard error stream by default.
1129 :param descriptions Boolean variable to store information if to use
1130 test case descriptions.
1131 :param verbosity Integer variable to store required verbosity level.
1133 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1134 self.stream = stream
1135 self.descriptions = descriptions
1136 self.verbosity = verbosity
1137 self.result_string = None
1138 self.runner = runner
1140 def addSuccess(self, test):
1142 Record a test succeeded result
1147 if self.current_test_case_info:
1148 self.current_test_case_info.logger.debug(
1149 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1150 test._testMethodName,
1151 test._testMethodDoc))
1152 unittest.TestResult.addSuccess(self, test)
1153 self.result_string = colorize("OK", GREEN)
1155 self.send_result_through_pipe(test, PASS)
1157 def addSkip(self, test, reason):
1159 Record a test skipped.
1165 if self.current_test_case_info:
1166 self.current_test_case_info.logger.debug(
1167 "--- addSkip() %s.%s(%s) called, reason is %s" %
1168 (test.__class__.__name__, test._testMethodName,
1169 test._testMethodDoc, reason))
1170 unittest.TestResult.addSkip(self, test, reason)
1171 self.result_string = colorize("SKIP", YELLOW)
1173 self.send_result_through_pipe(test, SKIP)
1175 def symlink_failed(self):
1176 if self.current_test_case_info:
1178 failed_dir = os.getenv('FAILED_DIR')
1179 link_path = os.path.join(
1182 os.path.basename(self.current_test_case_info.tempdir))
1183 if self.current_test_case_info.logger:
1184 self.current_test_case_info.logger.debug(
1185 "creating a link to the failed test")
1186 self.current_test_case_info.logger.debug(
1187 "os.symlink(%s, %s)" %
1188 (self.current_test_case_info.tempdir, link_path))
1189 if os.path.exists(link_path):
1190 if self.current_test_case_info.logger:
1191 self.current_test_case_info.logger.debug(
1192 'symlink already exists')
1194 os.symlink(self.current_test_case_info.tempdir, link_path)
1196 except Exception as e:
1197 if self.current_test_case_info.logger:
1198 self.current_test_case_info.logger.error(e)
1200 def send_result_through_pipe(self, test, result):
1201 if hasattr(self, 'test_framework_result_pipe'):
1202 pipe = self.test_framework_result_pipe
1204 pipe.send((test.id(), result))
1206 def log_error(self, test, err, fn_name):
1207 if self.current_test_case_info:
1208 if isinstance(test, unittest.suite._ErrorHolder):
1209 test_name = test.description
1211 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1212 test._testMethodName,
1213 test._testMethodDoc)
1214 self.current_test_case_info.logger.debug(
1215 "--- %s() %s called, err is %s" %
1216 (fn_name, test_name, err))
1217 self.current_test_case_info.logger.debug(
1218 "formatted exception is:\n%s" %
1219 "".join(format_exception(*err)))
1221 def add_error(self, test, err, unittest_fn, error_type):
1222 if error_type == FAIL:
1223 self.log_error(test, err, 'addFailure')
1224 error_type_str = colorize("FAIL", RED)
1225 elif error_type == ERROR:
1226 self.log_error(test, err, 'addError')
1227 error_type_str = colorize("ERROR", RED)
1229 raise Exception('Error type %s cannot be used to record an '
1230 'error or a failure' % error_type)
1232 unittest_fn(self, test, err)
1233 if self.current_test_case_info:
1234 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1236 self.current_test_case_info.tempdir)
1237 self.symlink_failed()
1238 self.failed_test_cases_info.add(self.current_test_case_info)
1239 if is_core_present(self.current_test_case_info.tempdir):
1240 if not self.current_test_case_info.core_crash_test:
1241 if isinstance(test, unittest.suite._ErrorHolder):
1242 test_name = str(test)
1244 test_name = "'{!s}' ({!s})".format(
1245 get_testcase_doc_name(test), test.id())
1246 self.current_test_case_info.core_crash_test = test_name
1247 self.core_crash_test_cases_info.add(
1248 self.current_test_case_info)
1250 self.result_string = '%s [no temp dir]' % error_type_str
1252 self.send_result_through_pipe(test, error_type)
1254 def addFailure(self, test, err):
1256 Record a test failed result
1259 :param err: error message
1262 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1264 def addError(self, test, err):
1266 Record a test error result
1269 :param err: error message
1272 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1274 def getDescription(self, test):
1276 Get test description
1279 :returns: test description
1282 return get_test_description(self.descriptions, test)
1284 def startTest(self, test):
1292 def print_header(test):
1293 if not hasattr(test.__class__, '_header_printed'):
1294 print(double_line_delim)
1295 print(colorize(getdoc(test).splitlines()[0], GREEN))
1296 print(double_line_delim)
1297 test.__class__._header_printed = True
1301 unittest.TestResult.startTest(self, test)
1302 if self.verbosity > 0:
1303 self.stream.writeln(
1304 "Starting " + self.getDescription(test) + " ...")
1305 self.stream.writeln(single_line_delim)
1307 def stopTest(self, test):
1309 Called when the given test has been run
1314 unittest.TestResult.stopTest(self, test)
1315 if self.verbosity > 0:
1316 self.stream.writeln(single_line_delim)
1317 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1318 self.result_string))
1319 self.stream.writeln(single_line_delim)
1321 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1322 self.result_string))
1324 self.send_result_through_pipe(test, TEST_RUN)
1326 def printErrors(self):
1328 Print errors from running the test case
1330 if len(self.errors) > 0 or len(self.failures) > 0:
1331 self.stream.writeln()
1332 self.printErrorList('ERROR', self.errors)
1333 self.printErrorList('FAIL', self.failures)
1335 # ^^ that is the last output from unittest before summary
1336 if not self.runner.print_summary:
1337 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1338 self.stream = devnull
1339 self.runner.stream = devnull
1341 def printErrorList(self, flavour, errors):
1343 Print error list to the output stream together with error type
1344 and test case description.
1346 :param flavour: error type
1347 :param errors: iterable errors
1350 for test, err in errors:
1351 self.stream.writeln(double_line_delim)
1352 self.stream.writeln("%s: %s" %
1353 (flavour, self.getDescription(test)))
1354 self.stream.writeln(single_line_delim)
1355 self.stream.writeln("%s" % err)
1358 class VppTestRunner(unittest.TextTestRunner):
1360 A basic test runner implementation which prints results to standard error.
1364 def resultclass(self):
1365 """Class maintaining the results of the tests"""
1366 return VppTestResult
1368 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1369 result_pipe=None, failfast=False, buffer=False,
1370 resultclass=None, print_summary=True, **kwargs):
1371 # ignore stream setting here, use hard-coded stdout to be in sync
1372 # with prints from VppTestCase methods ...
1373 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1374 verbosity, failfast, buffer,
1375 resultclass, **kwargs)
1376 KeepAliveReporter.pipe = keep_alive_pipe
1378 self.orig_stream = self.stream
1379 self.resultclass.test_framework_result_pipe = result_pipe
1381 self.print_summary = print_summary
1383 def _makeResult(self):
1384 return self.resultclass(self.stream,
1389 def run(self, test):
1396 faulthandler.enable() # emit stack trace to stderr if killed by signal
1398 result = super(VppTestRunner, self).run(test)
1399 if not self.print_summary:
1400 self.stream = self.orig_stream
1401 result.stream = self.orig_stream
1405 class Worker(Thread):
1406 def __init__(self, args, logger, env={}):
1407 self.logger = logger
1410 self.env = copy.deepcopy(env)
1411 super(Worker, self).__init__()
1414 executable = self.args[0]
1415 self.logger.debug("Running executable w/args `%s'" % self.args)
1416 env = os.environ.copy()
1417 env.update(self.env)
1418 env["CK_LOG_FILE_NAME"] = "-"
1419 self.process = subprocess.Popen(
1420 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1421 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1422 out, err = self.process.communicate()
1423 self.logger.debug("Finished running `%s'" % executable)
1424 self.logger.info("Return code is `%s'" % self.process.returncode)
1425 self.logger.info(single_line_delim)
1426 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1427 self.logger.info(single_line_delim)
1428 self.logger.info(out)
1429 self.logger.info(single_line_delim)
1430 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1431 self.logger.info(single_line_delim)
1432 self.logger.info(err)
1433 self.logger.info(single_line_delim)
1434 self.result = self.process.returncode
1436 if __name__ == '__main__':