3 from __future__ import print_function
16 from collections import deque
17 from threading import Thread, Event
18 from inspect import getdoc, isclass
19 from traceback import format_exception
20 from logging import FileHandler, DEBUG, Formatter
23 from scapy.packet import Raw
24 from hook import StepHook, PollHook, VppDiedError
25 from vpp_pg_interface import VppPGInterface
26 from vpp_sub_interface import VppSubInterface
27 from vpp_lo_interface import VppLoInterface
28 from vpp_bvi_interface import VppBviInterface
29 from vpp_papi_provider import VppPapiProvider
30 from vpp_papi.vpp_stats import VPPStats
31 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
33 from vpp_object import VppObjectRegistry
34 from util import ppp, is_core_present
35 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
36 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
37 from scapy.layers.inet6 import ICMPv6EchoReply
39 if os.name == 'posix' and sys.version_info[0] < 3:
40 # using subprocess32 is recommended by python official documentation
41 # @ https://docs.python.org/2/library/subprocess.html
42 import subprocess32 as subprocess
46 # Python2/3 compatible
58 debug_framework = False
59 if os.getenv('TEST_DEBUG', "0") == "1":
60 debug_framework = True
64 Test framework module.
66 The module provides a set of tools for constructing and running tests and
67 representing the results.
71 class _PacketInfo(object):
72 """Private class to create packet info object.
74 Help process information about the next packet.
75 Set variables to default values.
77 #: Store the index of the packet.
79 #: Store the index of the source packet generator interface of the packet.
81 #: Store the index of the destination packet generator interface
84 #: Store expected ip version
86 #: Store expected upper protocol
88 #: Store the copy of the former packet.
91 def __eq__(self, other):
92 index = self.index == other.index
93 src = self.src == other.src
94 dst = self.dst == other.dst
95 data = self.data == other.data
96 return index and src and dst and data
99 def pump_output(testclass):
100 """ pump output from vpp stdout/stderr to proper queues """
103 while not testclass.pump_thread_stop_flag.is_set():
104 readable = select.select([testclass.vpp.stdout.fileno(),
105 testclass.vpp.stderr.fileno(),
106 testclass.pump_thread_wakeup_pipe[0]],
108 if testclass.vpp.stdout.fileno() in readable:
109 read = os.read(testclass.vpp.stdout.fileno(), 102400)
111 split = read.splitlines(True)
112 if len(stdout_fragment) > 0:
113 split[0] = "%s%s" % (stdout_fragment, split[0])
114 if len(split) > 0 and split[-1].endswith("\n"):
118 stdout_fragment = split[-1]
119 testclass.vpp_stdout_deque.extend(split[:limit])
120 if not testclass.cache_vpp_output:
121 for line in split[:limit]:
122 testclass.logger.debug(
123 "VPP STDOUT: %s" % line.rstrip("\n"))
124 if testclass.vpp.stderr.fileno() in readable:
125 read = os.read(testclass.vpp.stderr.fileno(), 102400)
127 split = read.splitlines(True)
128 if len(stderr_fragment) > 0:
129 split[0] = "%s%s" % (stderr_fragment, split[0])
130 if len(split) > 0 and split[-1].endswith(b"\n"):
134 stderr_fragment = split[-1]
135 testclass.vpp_stderr_deque.extend(split[:limit])
136 if not testclass.cache_vpp_output:
137 for line in split[:limit]:
138 testclass.logger.debug(
139 "VPP STDERR: %s" % line.rstrip("\n"))
140 # ignoring the dummy pipe here intentionally - the
141 # flag will take care of properly terminating the loop
144 def _is_skip_aarch64_set():
145 return os.getenv('SKIP_AARCH64', 'n').lower() in ('yes', 'y', '1')
147 is_skip_aarch64_set = _is_skip_aarch64_set()
150 def _is_platform_aarch64():
151 return platform.machine() == 'aarch64'
153 is_platform_aarch64 = _is_platform_aarch64()
156 def _running_extended_tests():
157 s = os.getenv("EXTENDED_TESTS", "n")
158 return True if s.lower() in ("y", "yes", "1") else False
160 running_extended_tests = _running_extended_tests()
163 def _running_on_centos():
164 os_id = os.getenv("OS_ID", "")
165 return True if "centos" in os_id.lower() else False
167 running_on_centos = _running_on_centos
170 class KeepAliveReporter(object):
172 Singleton object which reports test start to parent process
177 self.__dict__ = self._shared_state
185 def pipe(self, pipe):
186 if self._pipe is not None:
187 raise Exception("Internal error - pipe should only be set once.")
190 def send_keep_alive(self, test, desc=None):
192 Write current test tmpdir & desc to keep-alive pipe to signal liveness
194 if self.pipe is None:
195 # if not running forked..
199 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
203 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
206 class VppTestCase(unittest.TestCase):
207 """This subclass is a base class for VPP test cases that are implemented as
208 classes. It provides methods to create and run test case.
211 extra_vpp_punt_config = []
212 extra_vpp_plugin_config = []
215 def packet_infos(self):
216 """List of packet infos"""
217 return self._packet_infos
220 def get_packet_count_for_if_idx(cls, dst_if_index):
221 """Get the number of packet info for specified destination if index"""
222 if dst_if_index in cls._packet_count_for_dst_if_idx:
223 return cls._packet_count_for_dst_if_idx[dst_if_index]
229 """Return the instance of this testcase"""
230 return cls.test_instance
233 def set_debug_flags(cls, d):
234 cls.debug_core = False
235 cls.debug_gdb = False
236 cls.debug_gdbserver = False
241 cls.debug_core = True
244 elif dl == "gdbserver":
245 cls.debug_gdbserver = True
247 raise Exception("Unrecognized DEBUG option: '%s'" % d)
250 def get_least_used_cpu():
251 cpu_usage_list = [set(range(psutil.cpu_count()))]
252 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
253 if 'vpp_main' == p.info['name']]
254 for vpp_process in vpp_processes:
255 for cpu_usage_set in cpu_usage_list:
257 cpu_num = vpp_process.cpu_num()
258 if cpu_num in cpu_usage_set:
259 cpu_usage_set_index = cpu_usage_list.index(
261 if cpu_usage_set_index == len(cpu_usage_list) - 1:
262 cpu_usage_list.append({cpu_num})
264 cpu_usage_list[cpu_usage_set_index + 1].add(
266 cpu_usage_set.remove(cpu_num)
268 except psutil.NoSuchProcess:
271 for cpu_usage_set in cpu_usage_list:
272 if len(cpu_usage_set) > 0:
273 min_usage_set = cpu_usage_set
276 return random.choice(tuple(min_usage_set))
279 def setUpConstants(cls):
280 """ Set-up the test case class based on environment variables """
281 s = os.getenv("STEP", "n")
282 cls.step = True if s.lower() in ("y", "yes", "1") else False
283 d = os.getenv("DEBUG", None)
284 c = os.getenv("CACHE_OUTPUT", "1")
285 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
286 cls.set_debug_flags(d)
287 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
288 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
289 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
291 if cls.plugin_path is not None:
292 if cls.extern_plugin_path is not None:
293 plugin_path = "%s:%s" % (
294 cls.plugin_path, cls.extern_plugin_path)
296 plugin_path = cls.plugin_path
297 elif cls.extern_plugin_path is not None:
298 plugin_path = cls.extern_plugin_path
300 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
301 debug_cli = "cli-listen localhost:5002"
303 size = os.getenv("COREDUMP_SIZE")
305 coredump_size = "coredump-size %s" % size
306 if coredump_size is None:
307 coredump_size = "coredump-size unlimited"
309 cpu_core_number = cls.get_least_used_cpu()
311 cls.vpp_cmdline = [cls.vpp_bin, "unix",
312 "{", "nodaemon", debug_cli, "full-coredump",
313 coredump_size, "runtime-dir", cls.tempdir, "}",
314 "api-trace", "{", "on", "}", "api-segment", "{",
315 "prefix", cls.shm_prefix, "}", "cpu", "{",
316 "main-core", str(cpu_core_number), "}",
317 "statseg", "{", "socket-name", cls.stats_sock, "}",
318 "socksvr", "{", "socket-name", cls.api_sock, "}",
320 "{", "plugin", "dpdk_plugin.so", "{", "disable",
321 "}", "plugin", "rdma_plugin.so", "{", "disable",
322 "}", "plugin", "unittest_plugin.so", "{", "enable",
323 "}"] + cls.extra_vpp_plugin_config + ["}", ]
324 if cls.extra_vpp_punt_config is not None:
325 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
326 if plugin_path is not None:
327 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
328 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
329 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
332 def wait_for_enter(cls):
333 if cls.debug_gdbserver:
334 print(double_line_delim)
335 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
337 print(double_line_delim)
338 print("Spawned VPP with PID: %d" % cls.vpp.pid)
340 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
342 print(single_line_delim)
343 print("You can debug the VPP using e.g.:")
344 if cls.debug_gdbserver:
345 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
346 print("Now is the time to attach a gdb by running the above "
347 "command, set up breakpoints etc. and then resume VPP from "
348 "within gdb by issuing the 'continue' command")
350 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
351 print("Now is the time to attach a gdb by running the above "
352 "command and set up breakpoints etc.")
353 print(single_line_delim)
354 input("Press ENTER to continue running the testcase...")
358 cmdline = cls.vpp_cmdline
360 if cls.debug_gdbserver:
361 gdbserver = '/usr/bin/gdbserver'
362 if not os.path.isfile(gdbserver) or \
363 not os.access(gdbserver, os.X_OK):
364 raise Exception("gdbserver binary '%s' does not exist or is "
365 "not executable" % gdbserver)
367 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
368 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
371 cls.vpp = subprocess.Popen(cmdline,
372 stdout=subprocess.PIPE,
373 stderr=subprocess.PIPE,
375 except subprocess.CalledProcessError as e:
376 cls.logger.critical("Subprocess returned with non-0 return code: ("
380 cls.logger.critical("Subprocess returned with OS error: "
381 "(%s) %s", e.errno, e.strerror)
383 except Exception as e:
384 cls.logger.exception("Subprocess returned unexpected from "
391 def wait_for_stats_socket(cls):
392 deadline = time.time() + 3
394 while time.time() < deadline or \
395 cls.debug_gdb or cls.debug_gdbserver:
396 if os.path.exists(cls.stats_sock):
401 cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
406 Perform class setup before running the testcase
407 Remove shared memory files, start vpp and connect the vpp-api
409 super(VppTestCase, cls).setUpClass()
410 gc.collect() # run garbage collection first
412 cls.logger = get_logger(cls.__name__)
413 if hasattr(cls, 'parallel_handler'):
414 cls.logger.addHandler(cls.parallel_handler)
415 cls.logger.propagate = False
417 cls.tempdir = tempfile.mkdtemp(
418 prefix='vpp-unittest-%s-' % cls.__name__)
419 cls.stats_sock = "%s/stats.sock" % cls.tempdir
420 cls.api_sock = "%s/api.sock" % cls.tempdir
421 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
422 cls.file_handler.setFormatter(
423 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
425 cls.file_handler.setLevel(DEBUG)
426 cls.logger.addHandler(cls.file_handler)
427 cls.logger.debug("--- setUpClass() for %s called ---" %
429 cls.shm_prefix = os.path.basename(cls.tempdir)
430 os.chdir(cls.tempdir)
431 cls.logger.info("Temporary dir is %s, shm prefix is %s",
432 cls.tempdir, cls.shm_prefix)
434 cls.reset_packet_infos()
436 cls._zombie_captures = []
439 cls.registry = VppObjectRegistry()
440 cls.vpp_startup_failed = False
441 cls.reporter = KeepAliveReporter()
442 # need to catch exceptions here because if we raise, then the cleanup
443 # doesn't get called and we might end with a zombie vpp
446 cls.reporter.send_keep_alive(cls, 'setUpClass')
447 VppTestResult.current_test_case_info = TestCaseInfo(
448 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
449 cls.vpp_stdout_deque = deque()
450 cls.vpp_stderr_deque = deque()
451 cls.pump_thread_stop_flag = Event()
452 cls.pump_thread_wakeup_pipe = os.pipe()
453 cls.pump_thread = Thread(target=pump_output, args=(cls,))
454 cls.pump_thread.daemon = True
455 cls.pump_thread.start()
456 if cls.debug_gdb or cls.debug_gdbserver:
460 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
466 cls.vapi.register_hook(hook)
467 cls.wait_for_stats_socket()
468 cls.statistics = VPPStats(socketname=cls.stats_sock)
472 cls.vpp_startup_failed = True
474 "VPP died shortly after startup, check the"
475 " output to standard error for possible cause")
481 cls.vapi.disconnect()
484 if cls.debug_gdbserver:
485 print(colorize("You're running VPP inside gdbserver but "
486 "VPP-API connection failed, did you forget "
487 "to 'continue' VPP from within gdb?", RED))
499 Disconnect vpp-api, kill vpp and cleanup shared memory files
501 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
503 if cls.vpp.returncode is None:
504 print(double_line_delim)
505 print("VPP or GDB server is still running")
506 print(single_line_delim)
507 input("When done debugging, press ENTER to kill the "
508 "process and finish running the testcase...")
510 # first signal that we want to stop the pump thread, then wake it up
511 if hasattr(cls, 'pump_thread_stop_flag'):
512 cls.pump_thread_stop_flag.set()
513 if hasattr(cls, 'pump_thread_wakeup_pipe'):
514 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
515 if hasattr(cls, 'pump_thread'):
516 cls.logger.debug("Waiting for pump thread to stop")
517 cls.pump_thread.join()
518 if hasattr(cls, 'vpp_stderr_reader_thread'):
519 cls.logger.debug("Waiting for stdderr pump to stop")
520 cls.vpp_stderr_reader_thread.join()
522 if hasattr(cls, 'vpp'):
523 if hasattr(cls, 'vapi'):
524 cls.logger.debug("Disconnecting class vapi client on %s",
526 cls.vapi.disconnect()
527 cls.logger.debug("Deleting class vapi attribute on %s",
531 if cls.vpp.returncode is None:
532 cls.logger.debug("Sending TERM to vpp")
534 cls.logger.debug("Waiting for vpp to die")
535 cls.vpp.communicate()
536 cls.logger.debug("Deleting class vpp attribute on %s",
540 if cls.vpp_startup_failed:
541 stdout_log = cls.logger.info
542 stderr_log = cls.logger.critical
544 stdout_log = cls.logger.info
545 stderr_log = cls.logger.info
547 if hasattr(cls, 'vpp_stdout_deque'):
548 stdout_log(single_line_delim)
549 stdout_log('VPP output to stdout while running %s:', cls.__name__)
550 stdout_log(single_line_delim)
551 vpp_output = "".join(cls.vpp_stdout_deque)
552 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
554 stdout_log('\n%s', vpp_output)
555 stdout_log(single_line_delim)
557 if hasattr(cls, 'vpp_stderr_deque'):
558 stderr_log(single_line_delim)
559 stderr_log('VPP output to stderr while running %s:', cls.__name__)
560 stderr_log(single_line_delim)
561 vpp_output = "".join(cls.vpp_stderr_deque)
562 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
564 stderr_log('\n%s', vpp_output)
565 stderr_log(single_line_delim)
568 def tearDownClass(cls):
569 """ Perform final cleanup after running all tests in this test-case """
570 cls.logger.debug("--- tearDownClass() for %s called ---" %
572 cls.reporter.send_keep_alive(cls, 'tearDownClass')
574 cls.file_handler.close()
575 cls.reset_packet_infos()
577 debug_internal.on_tear_down_class(cls)
579 def show_commands_at_teardown(self):
580 """ Allow subclass specific teardown logging additions."""
581 self.logger.info("--- No test specific show commands provided. ---")
584 """ Show various debug prints after each test """
585 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
586 (self.__class__.__name__, self._testMethodName,
587 self._testMethodDoc))
588 if not self.vpp_dead:
590 "--- Logging show commands common to all testcases. ---")
591 self.logger.debug(self.vapi.cli("show trace max 1000"))
592 self.logger.info(self.vapi.ppcli("show interface"))
593 self.logger.info(self.vapi.ppcli("show hardware"))
594 self.logger.info(self.statistics.set_errors_str())
595 self.logger.info(self.vapi.ppcli("show run"))
596 self.logger.info(self.vapi.ppcli("show log"))
597 self.logger.info("Logging testcase specific show commands.")
598 self.show_commands_at_teardown()
599 self.registry.remove_vpp_config(self.logger)
600 # Save/Dump VPP api trace log
601 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
602 tmp_api_trace = "/tmp/%s" % api_trace
603 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
604 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
605 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
607 os.rename(tmp_api_trace, vpp_api_trace_log)
608 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
611 self.registry.unregister_all(self.logger)
614 """ Clear trace before running each test"""
615 super(VppTestCase, self).setUp()
616 self.reporter.send_keep_alive(self)
618 raise Exception("VPP is dead when setting up the test")
619 self.sleep(.1, "during setUp")
620 self.vpp_stdout_deque.append(
621 "--- test setUp() for %s.%s(%s) starts here ---\n" %
622 (self.__class__.__name__, self._testMethodName,
623 self._testMethodDoc))
624 self.vpp_stderr_deque.append(
625 "--- test setUp() for %s.%s(%s) starts here ---\n" %
626 (self.__class__.__name__, self._testMethodName,
627 self._testMethodDoc))
628 self.vapi.cli("clear trace")
629 # store the test instance inside the test class - so that objects
630 # holding the class can access instance methods (like assertEqual)
631 type(self).test_instance = self
634 def pg_enable_capture(cls, interfaces=None):
636 Enable capture on packet-generator interfaces
638 :param interfaces: iterable interface indexes (if None,
639 use self.pg_interfaces)
642 if interfaces is None:
643 interfaces = cls.pg_interfaces
648 def register_capture(cls, cap_name):
649 """ Register a capture in the testclass """
650 # add to the list of captures with current timestamp
651 cls._captures.append((time.time(), cap_name))
652 # filter out from zombies
653 cls._zombie_captures = [(stamp, name)
654 for (stamp, name) in cls._zombie_captures
659 """ Remove any zombie captures and enable the packet generator """
660 # how long before capture is allowed to be deleted - otherwise vpp
661 # crashes - 100ms seems enough (this shouldn't be needed at all)
664 for stamp, cap_name in cls._zombie_captures:
665 wait = stamp + capture_ttl - now
667 cls.sleep(wait, "before deleting capture %s" % cap_name)
669 cls.logger.debug("Removing zombie capture %s" % cap_name)
670 cls.vapi.cli('packet-generator delete %s' % cap_name)
672 cls.vapi.cli("trace add pg-input 1000")
673 cls.vapi.cli('packet-generator enable')
674 cls._zombie_captures = cls._captures
678 def create_pg_interfaces(cls, interfaces):
680 Create packet-generator interfaces.
682 :param interfaces: iterable indexes of the interfaces.
683 :returns: List of created interfaces.
688 intf = VppPGInterface(cls, i)
689 setattr(cls, intf.name, intf)
691 cls.pg_interfaces = result
695 def create_loopback_interfaces(cls, count):
697 Create loopback interfaces.
699 :param count: number of interfaces created.
700 :returns: List of created interfaces.
702 result = [VppLoInterface(cls) for i in range(count)]
704 setattr(cls, intf.name, intf)
705 cls.lo_interfaces = result
709 def create_bvi_interfaces(cls, count):
711 Create BVI interfaces.
713 :param count: number of interfaces created.
714 :returns: List of created interfaces.
716 result = [VppBviInterface(cls) for i in range(count)]
718 setattr(cls, intf.name, intf)
719 cls.bvi_interfaces = result
723 def extend_packet(packet, size, padding=' '):
725 Extend packet to given size by padding with spaces or custom padding
726 NOTE: Currently works only when Raw layer is present.
728 :param packet: packet
729 :param size: target size
730 :param padding: padding used to extend the payload
733 packet_len = len(packet) + 4
734 extend = size - packet_len
736 num = (extend / len(padding)) + 1
737 packet[Raw].load += (padding * num)[:extend]
740 def reset_packet_infos(cls):
741 """ Reset the list of packet info objects and packet counts to zero """
742 cls._packet_infos = {}
743 cls._packet_count_for_dst_if_idx = {}
746 def create_packet_info(cls, src_if, dst_if):
748 Create packet info object containing the source and destination indexes
749 and add it to the testcase's packet info list
751 :param VppInterface src_if: source interface
752 :param VppInterface dst_if: destination interface
754 :returns: _PacketInfo object
758 info.index = len(cls._packet_infos)
759 info.src = src_if.sw_if_index
760 info.dst = dst_if.sw_if_index
761 if isinstance(dst_if, VppSubInterface):
762 dst_idx = dst_if.parent.sw_if_index
764 dst_idx = dst_if.sw_if_index
765 if dst_idx in cls._packet_count_for_dst_if_idx:
766 cls._packet_count_for_dst_if_idx[dst_idx] += 1
768 cls._packet_count_for_dst_if_idx[dst_idx] = 1
769 cls._packet_infos[info.index] = info
773 def info_to_payload(info):
775 Convert _PacketInfo object to packet payload
777 :param info: _PacketInfo object
779 :returns: string containing serialized data from packet info
781 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
785 def payload_to_info(payload, payload_field='load'):
787 Convert packet payload to _PacketInfo object
789 :param payload: packet payload
790 :type payload: <class 'scapy.packet.Raw'>
791 :param payload_field: packet fieldname of payload "load" for
792 <class 'scapy.packet.Raw'>
793 :type payload_field: str
794 :returns: _PacketInfo object containing de-serialized data from payload
797 numbers = getattr(payload, payload_field).split()
799 info.index = int(numbers[0])
800 info.src = int(numbers[1])
801 info.dst = int(numbers[2])
802 info.ip = int(numbers[3])
803 info.proto = int(numbers[4])
806 def get_next_packet_info(self, info):
808 Iterate over the packet info list stored in the testcase
809 Start iteration with first element if info is None
810 Continue based on index in info if info is specified
812 :param info: info or None
813 :returns: next info in list or None if no more infos
818 next_index = info.index + 1
819 if next_index == len(self._packet_infos):
822 return self._packet_infos[next_index]
824 def get_next_packet_info_for_interface(self, src_index, info):
826 Search the packet info list for the next packet info with same source
829 :param src_index: source interface index to search for
830 :param info: packet info - where to start the search
831 :returns: packet info or None
835 info = self.get_next_packet_info(info)
838 if info.src == src_index:
841 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
843 Search the packet info list for the next packet info with same source
844 and destination interface indexes
846 :param src_index: source interface index to search for
847 :param dst_index: destination interface index to search for
848 :param info: packet info - where to start the search
849 :returns: packet info or None
853 info = self.get_next_packet_info_for_interface(src_index, info)
856 if info.dst == dst_index:
859 def assert_equal(self, real_value, expected_value, name_or_class=None):
860 if name_or_class is None:
861 self.assertEqual(real_value, expected_value)
864 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
865 msg = msg % (getdoc(name_or_class).strip(),
866 real_value, str(name_or_class(real_value)),
867 expected_value, str(name_or_class(expected_value)))
869 msg = "Invalid %s: %s does not match expected value %s" % (
870 name_or_class, real_value, expected_value)
872 self.assertEqual(real_value, expected_value, msg)
874 def assert_in_range(self,
882 msg = "Invalid %s: %s out of range <%s,%s>" % (
883 name, real_value, expected_min, expected_max)
884 self.assertTrue(expected_min <= real_value <= expected_max, msg)
886 def assert_packet_checksums_valid(self, packet,
887 ignore_zero_udp_checksums=True):
888 received = packet.__class__(scapy.compat.raw(packet))
890 ppp("Verifying packet checksums for packet:", received))
891 udp_layers = ['UDP', 'UDPerror']
892 checksum_fields = ['cksum', 'chksum']
895 temp = received.__class__(scapy.compat.raw(received))
897 layer = temp.getlayer(counter)
899 for cf in checksum_fields:
900 if hasattr(layer, cf):
901 if ignore_zero_udp_checksums and \
902 0 == getattr(layer, cf) and \
903 layer.name in udp_layers:
906 checksums.append((counter, cf))
909 counter = counter + 1
910 if 0 == len(checksums):
912 temp = temp.__class__(scapy.compat.raw(temp))
913 for layer, cf in checksums:
914 calc_sum = getattr(temp[layer], cf)
916 getattr(received[layer], cf), calc_sum,
917 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
919 "Checksum field `%s` on `%s` layer has correct value `%s`" %
920 (cf, temp[layer].name, calc_sum))
922 def assert_checksum_valid(self, received_packet, layer,
924 ignore_zero_checksum=False):
925 """ Check checksum of received packet on given layer """
926 received_packet_checksum = getattr(received_packet[layer], field_name)
927 if ignore_zero_checksum and 0 == received_packet_checksum:
929 recalculated = received_packet.__class__(
930 scapy.compat.raw(received_packet))
931 delattr(recalculated[layer], field_name)
932 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
933 self.assert_equal(received_packet_checksum,
934 getattr(recalculated[layer], field_name),
935 "packet checksum on layer: %s" % layer)
937 def assert_ip_checksum_valid(self, received_packet,
938 ignore_zero_checksum=False):
939 self.assert_checksum_valid(received_packet, 'IP',
940 ignore_zero_checksum=ignore_zero_checksum)
942 def assert_tcp_checksum_valid(self, received_packet,
943 ignore_zero_checksum=False):
944 self.assert_checksum_valid(received_packet, 'TCP',
945 ignore_zero_checksum=ignore_zero_checksum)
947 def assert_udp_checksum_valid(self, received_packet,
948 ignore_zero_checksum=True):
949 self.assert_checksum_valid(received_packet, 'UDP',
950 ignore_zero_checksum=ignore_zero_checksum)
952 def assert_embedded_icmp_checksum_valid(self, received_packet):
953 if received_packet.haslayer(IPerror):
954 self.assert_checksum_valid(received_packet, 'IPerror')
955 if received_packet.haslayer(TCPerror):
956 self.assert_checksum_valid(received_packet, 'TCPerror')
957 if received_packet.haslayer(UDPerror):
958 self.assert_checksum_valid(received_packet, 'UDPerror',
959 ignore_zero_checksum=True)
960 if received_packet.haslayer(ICMPerror):
961 self.assert_checksum_valid(received_packet, 'ICMPerror')
963 def assert_icmp_checksum_valid(self, received_packet):
964 self.assert_checksum_valid(received_packet, 'ICMP')
965 self.assert_embedded_icmp_checksum_valid(received_packet)
967 def assert_icmpv6_checksum_valid(self, pkt):
968 if pkt.haslayer(ICMPv6DestUnreach):
969 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
970 self.assert_embedded_icmp_checksum_valid(pkt)
971 if pkt.haslayer(ICMPv6EchoRequest):
972 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
973 if pkt.haslayer(ICMPv6EchoReply):
974 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
976 def assert_packet_counter_equal(self, counter, expected_value):
977 if counter.startswith("/"):
978 counter_value = self.statistics.get_counter(counter)
979 self.assert_equal(counter_value, expected_value,
980 "packet counter `%s'" % counter)
982 counters = self.vapi.cli("sh errors").split('\n')
984 for i in range(1, len(counters) - 1):
985 results = counters[i].split()
986 if results[1] == counter:
987 counter_value = int(results[0])
991 def sleep(cls, timeout, remark=None):
993 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
994 # * by Guido, only the main thread can be interrupted.
996 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
999 if hasattr(os, 'sched_yield'):
1005 if hasattr(cls, 'logger'):
1006 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1007 before = time.time()
1010 if hasattr(cls, 'logger') and after - before > 2 * timeout:
1011 cls.logger.error("unexpected self.sleep() result - "
1012 "slept for %es instead of ~%es!",
1013 after - before, timeout)
1014 if hasattr(cls, 'logger'):
1016 "Finished sleep (%s) - slept %es (wanted %es)",
1017 remark, after - before, timeout)
1019 def pg_send(self, intf, pkts):
1020 self.vapi.cli("clear trace")
1021 intf.add_stream(pkts)
1022 self.pg_enable_capture(self.pg_interfaces)
1025 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
1026 self.pg_send(intf, pkts)
1029 for i in self.pg_interfaces:
1030 i.get_capture(0, timeout=timeout)
1031 i.assert_nothing_captured(remark=remark)
1034 def send_and_expect(self, intf, pkts, output, n_rx=None):
1037 self.pg_send(intf, pkts)
1038 rx = output.get_capture(n_rx)
1041 def send_and_expect_only(self, intf, pkts, output, timeout=None):
1042 self.pg_send(intf, pkts)
1043 rx = output.get_capture(len(pkts))
1047 for i in self.pg_interfaces:
1048 if i not in outputs:
1049 i.get_capture(0, timeout=timeout)
1050 i.assert_nothing_captured()
1056 """ unittest calls runTest when TestCase is instantiated without a
1057 test case. Use case: Writing unittests against VppTestCase"""
1061 def get_testcase_doc_name(test):
1062 return getdoc(test.__class__).splitlines()[0]
1065 def get_test_description(descriptions, test):
1066 short_description = test.shortDescription()
1067 if descriptions and short_description:
1068 return short_description
1073 class TestCaseInfo(object):
1074 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1075 self.logger = logger
1076 self.tempdir = tempdir
1077 self.vpp_pid = vpp_pid
1078 self.vpp_bin_path = vpp_bin_path
1079 self.core_crash_test = None
1082 class VppTestResult(unittest.TestResult):
1084 @property result_string
1085 String variable to store the test case result string.
1087 List variable containing 2-tuples of TestCase instances and strings
1088 holding formatted tracebacks. Each tuple represents a test which
1089 raised an unexpected exception.
1091 List variable containing 2-tuples of TestCase instances and strings
1092 holding formatted tracebacks. Each tuple represents a test where
1093 a failure was explicitly signalled using the TestCase.assert*()
1097 failed_test_cases_info = set()
1098 core_crash_test_cases_info = set()
1099 current_test_case_info = None
1101 def __init__(self, stream=None, descriptions=None, verbosity=None,
1104 :param stream File descriptor to store where to report test results.
1105 Set to the standard error stream by default.
1106 :param descriptions Boolean variable to store information if to use
1107 test case descriptions.
1108 :param verbosity Integer variable to store required verbosity level.
1110 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1111 self.stream = stream
1112 self.descriptions = descriptions
1113 self.verbosity = verbosity
1114 self.result_string = None
1115 self.runner = runner
1117 def addSuccess(self, test):
1119 Record a test succeeded result
1124 if self.current_test_case_info:
1125 self.current_test_case_info.logger.debug(
1126 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1127 test._testMethodName,
1128 test._testMethodDoc))
1129 unittest.TestResult.addSuccess(self, test)
1130 self.result_string = colorize("OK", GREEN)
1132 self.send_result_through_pipe(test, PASS)
1134 def addSkip(self, test, reason):
1136 Record a test skipped.
1142 if self.current_test_case_info:
1143 self.current_test_case_info.logger.debug(
1144 "--- addSkip() %s.%s(%s) called, reason is %s" %
1145 (test.__class__.__name__, test._testMethodName,
1146 test._testMethodDoc, reason))
1147 unittest.TestResult.addSkip(self, test, reason)
1148 self.result_string = colorize("SKIP", YELLOW)
1150 self.send_result_through_pipe(test, SKIP)
1152 def symlink_failed(self):
1153 if self.current_test_case_info:
1155 failed_dir = os.getenv('FAILED_DIR')
1156 link_path = os.path.join(
1159 os.path.basename(self.current_test_case_info.tempdir))
1160 if self.current_test_case_info.logger:
1161 self.current_test_case_info.logger.debug(
1162 "creating a link to the failed test")
1163 self.current_test_case_info.logger.debug(
1164 "os.symlink(%s, %s)" %
1165 (self.current_test_case_info.tempdir, link_path))
1166 if os.path.exists(link_path):
1167 if self.current_test_case_info.logger:
1168 self.current_test_case_info.logger.debug(
1169 'symlink already exists')
1171 os.symlink(self.current_test_case_info.tempdir, link_path)
1173 except Exception as e:
1174 if self.current_test_case_info.logger:
1175 self.current_test_case_info.logger.error(e)
1177 def send_result_through_pipe(self, test, result):
1178 if hasattr(self, 'test_framework_result_pipe'):
1179 pipe = self.test_framework_result_pipe
1181 pipe.send((test.id(), result))
1183 def log_error(self, test, err, fn_name):
1184 if self.current_test_case_info:
1185 if isinstance(test, unittest.suite._ErrorHolder):
1186 test_name = test.description
1188 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1189 test._testMethodName,
1190 test._testMethodDoc)
1191 self.current_test_case_info.logger.debug(
1192 "--- %s() %s called, err is %s" %
1193 (fn_name, test_name, err))
1194 self.current_test_case_info.logger.debug(
1195 "formatted exception is:\n%s" %
1196 "".join(format_exception(*err)))
1198 def add_error(self, test, err, unittest_fn, error_type):
1199 if error_type == FAIL:
1200 self.log_error(test, err, 'addFailure')
1201 error_type_str = colorize("FAIL", RED)
1202 elif error_type == ERROR:
1203 self.log_error(test, err, 'addError')
1204 error_type_str = colorize("ERROR", RED)
1206 raise Exception('Error type %s cannot be used to record an '
1207 'error or a failure' % error_type)
1209 unittest_fn(self, test, err)
1210 if self.current_test_case_info:
1211 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1213 self.current_test_case_info.tempdir)
1214 self.symlink_failed()
1215 self.failed_test_cases_info.add(self.current_test_case_info)
1216 if is_core_present(self.current_test_case_info.tempdir):
1217 if not self.current_test_case_info.core_crash_test:
1218 if isinstance(test, unittest.suite._ErrorHolder):
1219 test_name = str(test)
1221 test_name = "'{!s}' ({!s})".format(
1222 get_testcase_doc_name(test), test.id())
1223 self.current_test_case_info.core_crash_test = test_name
1224 self.core_crash_test_cases_info.add(
1225 self.current_test_case_info)
1227 self.result_string = '%s [no temp dir]' % error_type_str
1229 self.send_result_through_pipe(test, error_type)
1231 def addFailure(self, test, err):
1233 Record a test failed result
1236 :param err: error message
1239 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1241 def addError(self, test, err):
1243 Record a test error result
1246 :param err: error message
1249 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1251 def getDescription(self, test):
1253 Get test description
1256 :returns: test description
1259 return get_test_description(self.descriptions, test)
1261 def startTest(self, test):
1269 def print_header(test):
1270 if not hasattr(test.__class__, '_header_printed'):
1271 print(double_line_delim)
1272 print(colorize(getdoc(test).splitlines()[0], GREEN))
1273 print(double_line_delim)
1274 test.__class__._header_printed = True
1278 unittest.TestResult.startTest(self, test)
1279 if self.verbosity > 0:
1280 self.stream.writeln(
1281 "Starting " + self.getDescription(test) + " ...")
1282 self.stream.writeln(single_line_delim)
1284 def stopTest(self, test):
1286 Called when the given test has been run
1291 unittest.TestResult.stopTest(self, test)
1292 if self.verbosity > 0:
1293 self.stream.writeln(single_line_delim)
1294 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1295 self.result_string))
1296 self.stream.writeln(single_line_delim)
1298 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1299 self.result_string))
1301 self.send_result_through_pipe(test, TEST_RUN)
1303 def printErrors(self):
1305 Print errors from running the test case
1307 if len(self.errors) > 0 or len(self.failures) > 0:
1308 self.stream.writeln()
1309 self.printErrorList('ERROR', self.errors)
1310 self.printErrorList('FAIL', self.failures)
1312 # ^^ that is the last output from unittest before summary
1313 if not self.runner.print_summary:
1314 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1315 self.stream = devnull
1316 self.runner.stream = devnull
1318 def printErrorList(self, flavour, errors):
1320 Print error list to the output stream together with error type
1321 and test case description.
1323 :param flavour: error type
1324 :param errors: iterable errors
1327 for test, err in errors:
1328 self.stream.writeln(double_line_delim)
1329 self.stream.writeln("%s: %s" %
1330 (flavour, self.getDescription(test)))
1331 self.stream.writeln(single_line_delim)
1332 self.stream.writeln("%s" % err)
1335 class VppTestRunner(unittest.TextTestRunner):
1337 A basic test runner implementation which prints results to standard error.
1341 def resultclass(self):
1342 """Class maintaining the results of the tests"""
1343 return VppTestResult
1345 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1346 result_pipe=None, failfast=False, buffer=False,
1347 resultclass=None, print_summary=True, **kwargs):
1348 # ignore stream setting here, use hard-coded stdout to be in sync
1349 # with prints from VppTestCase methods ...
1350 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1351 verbosity, failfast, buffer,
1352 resultclass, **kwargs)
1353 KeepAliveReporter.pipe = keep_alive_pipe
1355 self.orig_stream = self.stream
1356 self.resultclass.test_framework_result_pipe = result_pipe
1358 self.print_summary = print_summary
1360 def _makeResult(self):
1361 return self.resultclass(self.stream,
1366 def run(self, test):
1373 faulthandler.enable() # emit stack trace to stderr if killed by signal
1375 result = super(VppTestRunner, self).run(test)
1376 if not self.print_summary:
1377 self.stream = self.orig_stream
1378 result.stream = self.orig_stream
1382 class Worker(Thread):
1383 def __init__(self, args, logger, env={}):
1384 self.logger = logger
1387 self.env = copy.deepcopy(env)
1388 super(Worker, self).__init__()
1391 executable = self.args[0]
1392 self.logger.debug("Running executable w/args `%s'" % self.args)
1393 env = os.environ.copy()
1394 env.update(self.env)
1395 env["CK_LOG_FILE_NAME"] = "-"
1396 self.process = subprocess.Popen(
1397 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1398 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1399 out, err = self.process.communicate()
1400 self.logger.debug("Finished running `%s'" % executable)
1401 self.logger.info("Return code is `%s'" % self.process.returncode)
1402 self.logger.info(single_line_delim)
1403 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1404 self.logger.info(single_line_delim)
1405 self.logger.info(out)
1406 self.logger.info(single_line_delim)
1407 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1408 self.logger.info(single_line_delim)
1409 self.logger.info(err)
1410 self.logger.info(single_line_delim)
1411 self.result = self.process.returncode
1413 if __name__ == '__main__':