3 from __future__ import print_function
16 from collections import deque
17 from threading import Thread, Event
18 from inspect import getdoc, isclass
19 from traceback import format_exception
20 from logging import FileHandler, DEBUG, Formatter
23 from scapy.packet import Raw
24 from hook import StepHook, PollHook, VppDiedError
25 from vpp_pg_interface import VppPGInterface
26 from vpp_sub_interface import VppSubInterface
27 from vpp_lo_interface import VppLoInterface
28 from vpp_bvi_interface import VppBviInterface
29 from vpp_papi_provider import VppPapiProvider
30 from vpp_papi.vpp_stats import VPPStats
31 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
33 from vpp_object import VppObjectRegistry
34 from util import ppp, is_core_present
35 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
36 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
37 from scapy.layers.inet6 import ICMPv6EchoReply
39 if os.name == 'posix' and sys.version_info[0] < 3:
40 # using subprocess32 is recommended by python official documentation
41 # @ https://docs.python.org/2/library/subprocess.html
42 import subprocess32 as subprocess
46 # Python2/3 compatible
58 debug_framework = False
59 if os.getenv('TEST_DEBUG', "0") == "1":
60 debug_framework = True
64 Test framework module.
66 The module provides a set of tools for constructing and running tests and
67 representing the results.
71 class _PacketInfo(object):
72 """Private class to create packet info object.
74 Help process information about the next packet.
75 Set variables to default values.
77 #: Store the index of the packet.
79 #: Store the index of the source packet generator interface of the packet.
81 #: Store the index of the destination packet generator interface
84 #: Store expected ip version
86 #: Store expected upper protocol
88 #: Store the copy of the former packet.
91 def __eq__(self, other):
92 index = self.index == other.index
93 src = self.src == other.src
94 dst = self.dst == other.dst
95 data = self.data == other.data
96 return index and src and dst and data
99 def pump_output(testclass):
100 """ pump output from vpp stdout/stderr to proper queues """
103 while not testclass.pump_thread_stop_flag.is_set():
104 readable = select.select([testclass.vpp.stdout.fileno(),
105 testclass.vpp.stderr.fileno(),
106 testclass.pump_thread_wakeup_pipe[0]],
108 if testclass.vpp.stdout.fileno() in readable:
109 read = os.read(testclass.vpp.stdout.fileno(), 102400)
111 split = read.splitlines(True)
112 if len(stdout_fragment) > 0:
113 split[0] = "%s%s" % (stdout_fragment, split[0])
114 if len(split) > 0 and split[-1].endswith("\n"):
118 stdout_fragment = split[-1]
119 testclass.vpp_stdout_deque.extend(split[:limit])
120 if not testclass.cache_vpp_output:
121 for line in split[:limit]:
122 testclass.logger.debug(
123 "VPP STDOUT: %s" % line.rstrip("\n"))
124 if testclass.vpp.stderr.fileno() in readable:
125 read = os.read(testclass.vpp.stderr.fileno(), 102400)
127 split = read.splitlines(True)
128 if len(stderr_fragment) > 0:
129 split[0] = "%s%s" % (stderr_fragment, split[0])
130 if len(split) > 0 and split[-1].endswith(b"\n"):
134 stderr_fragment = split[-1]
135 testclass.vpp_stderr_deque.extend(split[:limit])
136 if not testclass.cache_vpp_output:
137 for line in split[:limit]:
138 testclass.logger.debug(
139 "VPP STDERR: %s" % line.rstrip("\n"))
140 # ignoring the dummy pipe here intentionally - the
141 # flag will take care of properly terminating the loop
144 def _is_skip_aarch64_set():
145 return os.getenv('SKIP_AARCH64', 'n').lower() in ('yes', 'y', '1')
147 is_skip_aarch64_set = _is_skip_aarch64_set()
150 def _is_platform_aarch64():
151 return platform.machine() == 'aarch64'
153 is_platform_aarch64 = _is_platform_aarch64()
156 def _running_extended_tests():
157 s = os.getenv("EXTENDED_TESTS", "n")
158 return True if s.lower() in ("y", "yes", "1") else False
160 running_extended_tests = _running_extended_tests()
163 def _running_on_centos():
164 os_id = os.getenv("OS_ID", "")
165 return True if "centos" in os_id.lower() else False
167 running_on_centos = _running_on_centos
170 class KeepAliveReporter(object):
172 Singleton object which reports test start to parent process
177 self.__dict__ = self._shared_state
185 def pipe(self, pipe):
186 if self._pipe is not None:
187 raise Exception("Internal error - pipe should only be set once.")
190 def send_keep_alive(self, test, desc=None):
192 Write current test tmpdir & desc to keep-alive pipe to signal liveness
194 if self.pipe is None:
195 # if not running forked..
199 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
203 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
206 class VppTestCase(unittest.TestCase):
207 """This subclass is a base class for VPP test cases that are implemented as
208 classes. It provides methods to create and run test case.
211 extra_vpp_punt_config = []
212 extra_vpp_plugin_config = []
215 def packet_infos(self):
216 """List of packet infos"""
217 return self._packet_infos
220 def get_packet_count_for_if_idx(cls, dst_if_index):
221 """Get the number of packet info for specified destination if index"""
222 if dst_if_index in cls._packet_count_for_dst_if_idx:
223 return cls._packet_count_for_dst_if_idx[dst_if_index]
229 """Return the instance of this testcase"""
230 return cls.test_instance
233 def set_debug_flags(cls, d):
234 cls.debug_core = False
235 cls.debug_gdb = False
236 cls.debug_gdbserver = False
241 cls.debug_core = True
244 elif dl == "gdbserver":
245 cls.debug_gdbserver = True
247 raise Exception("Unrecognized DEBUG option: '%s'" % d)
250 def get_least_used_cpu():
251 cpu_usage_list = [set(range(psutil.cpu_count()))]
252 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
253 if 'vpp_main' == p.info['name']]
254 for vpp_process in vpp_processes:
255 for cpu_usage_set in cpu_usage_list:
257 cpu_num = vpp_process.cpu_num()
258 if cpu_num in cpu_usage_set:
259 cpu_usage_set_index = cpu_usage_list.index(
261 if cpu_usage_set_index == len(cpu_usage_list) - 1:
262 cpu_usage_list.append({cpu_num})
264 cpu_usage_list[cpu_usage_set_index + 1].add(
266 cpu_usage_set.remove(cpu_num)
268 except psutil.NoSuchProcess:
271 for cpu_usage_set in cpu_usage_list:
272 if len(cpu_usage_set) > 0:
273 min_usage_set = cpu_usage_set
276 return random.choice(tuple(min_usage_set))
279 def setUpConstants(cls):
280 """ Set-up the test case class based on environment variables """
281 s = os.getenv("STEP", "n")
282 cls.step = True if s.lower() in ("y", "yes", "1") else False
283 d = os.getenv("DEBUG", None)
284 c = os.getenv("CACHE_OUTPUT", "1")
285 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
286 cls.set_debug_flags(d)
287 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
288 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
289 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
291 if cls.plugin_path is not None:
292 if cls.extern_plugin_path is not None:
293 plugin_path = "%s:%s" % (
294 cls.plugin_path, cls.extern_plugin_path)
296 plugin_path = cls.plugin_path
297 elif cls.extern_plugin_path is not None:
298 plugin_path = cls.extern_plugin_path
300 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
301 debug_cli = "cli-listen localhost:5002"
303 size = os.getenv("COREDUMP_SIZE")
305 coredump_size = "coredump-size %s" % size
306 if coredump_size is None:
307 coredump_size = "coredump-size unlimited"
309 cpu_core_number = cls.get_least_used_cpu()
311 cls.vpp_cmdline = [cls.vpp_bin, "unix",
312 "{", "nodaemon", debug_cli, "full-coredump",
313 coredump_size, "runtime-dir", cls.tempdir, "}",
314 "api-trace", "{", "on", "}", "api-segment", "{",
315 "prefix", cls.shm_prefix, "}", "cpu", "{",
316 "main-core", str(cpu_core_number), "}", "statseg",
317 "{", "socket-name", cls.stats_sock, "}", "plugins",
318 "{", "plugin", "dpdk_plugin.so", "{", "disable",
319 "}", "plugin", "rdma_plugin.so", "{", "disable",
320 "}", "plugin", "unittest_plugin.so", "{", "enable",
321 "}"] + cls.extra_vpp_plugin_config + ["}", ]
322 if cls.extra_vpp_punt_config is not None:
323 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
324 if plugin_path is not None:
325 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
326 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
327 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
330 def wait_for_enter(cls):
331 if cls.debug_gdbserver:
332 print(double_line_delim)
333 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
335 print(double_line_delim)
336 print("Spawned VPP with PID: %d" % cls.vpp.pid)
338 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
340 print(single_line_delim)
341 print("You can debug the VPP using e.g.:")
342 if cls.debug_gdbserver:
343 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
344 print("Now is the time to attach a gdb by running the above "
345 "command, set up breakpoints etc. and then resume VPP from "
346 "within gdb by issuing the 'continue' command")
348 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
349 print("Now is the time to attach a gdb by running the above "
350 "command and set up breakpoints etc.")
351 print(single_line_delim)
352 input("Press ENTER to continue running the testcase...")
356 cmdline = cls.vpp_cmdline
358 if cls.debug_gdbserver:
359 gdbserver = '/usr/bin/gdbserver'
360 if not os.path.isfile(gdbserver) or \
361 not os.access(gdbserver, os.X_OK):
362 raise Exception("gdbserver binary '%s' does not exist or is "
363 "not executable" % gdbserver)
365 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
366 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
369 cls.vpp = subprocess.Popen(cmdline,
370 stdout=subprocess.PIPE,
371 stderr=subprocess.PIPE,
373 except subprocess.CalledProcessError as e:
374 cls.logger.critical("Subprocess returned with non-0 return code: ("
378 cls.logger.critical("Subprocess returned with OS error: "
379 "(%s) %s", e.errno, e.strerror)
381 except Exception as e:
382 cls.logger.exception("Subprocess returned unexpected from "
389 def wait_for_stats_socket(cls):
390 deadline = time.time() + 3
392 while time.time() < deadline or \
393 cls.debug_gdb or cls.debug_gdbserver:
394 if os.path.exists(cls.stats_sock):
399 cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
404 Perform class setup before running the testcase
405 Remove shared memory files, start vpp and connect the vpp-api
407 super(VppTestCase, cls).setUpClass()
408 gc.collect() # run garbage collection first
410 cls.logger = get_logger(cls.__name__)
411 if hasattr(cls, 'parallel_handler'):
412 cls.logger.addHandler(cls.parallel_handler)
413 cls.logger.propagate = False
414 cls.tempdir = tempfile.mkdtemp(
415 prefix='vpp-unittest-%s-' % cls.__name__)
416 cls.stats_sock = "%s/stats.sock" % cls.tempdir
417 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
418 cls.file_handler.setFormatter(
419 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
421 cls.file_handler.setLevel(DEBUG)
422 cls.logger.addHandler(cls.file_handler)
423 cls.shm_prefix = os.path.basename(cls.tempdir)
424 os.chdir(cls.tempdir)
425 cls.logger.info("Temporary dir is %s, shm prefix is %s",
426 cls.tempdir, cls.shm_prefix)
428 cls.reset_packet_infos()
430 cls._zombie_captures = []
433 cls.registry = VppObjectRegistry()
434 cls.vpp_startup_failed = False
435 cls.reporter = KeepAliveReporter()
436 # need to catch exceptions here because if we raise, then the cleanup
437 # doesn't get called and we might end with a zombie vpp
440 cls.reporter.send_keep_alive(cls, 'setUpClass')
441 VppTestResult.current_test_case_info = TestCaseInfo(
442 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
443 cls.vpp_stdout_deque = deque()
444 cls.vpp_stderr_deque = deque()
445 cls.pump_thread_stop_flag = Event()
446 cls.pump_thread_wakeup_pipe = os.pipe()
447 cls.pump_thread = Thread(target=pump_output, args=(cls,))
448 cls.pump_thread.daemon = True
449 cls.pump_thread.start()
450 if cls.debug_gdb or cls.debug_gdbserver:
454 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
460 cls.vapi.register_hook(hook)
461 cls.wait_for_stats_socket()
462 cls.statistics = VPPStats(socketname=cls.stats_sock)
466 cls.vpp_startup_failed = True
468 "VPP died shortly after startup, check the"
469 " output to standard error for possible cause")
475 cls.vapi.disconnect()
478 if cls.debug_gdbserver:
479 print(colorize("You're running VPP inside gdbserver but "
480 "VPP-API connection failed, did you forget "
481 "to 'continue' VPP from within gdb?", RED))
493 Disconnect vpp-api, kill vpp and cleanup shared memory files
495 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
497 if cls.vpp.returncode is None:
498 print(double_line_delim)
499 print("VPP or GDB server is still running")
500 print(single_line_delim)
501 input("When done debugging, press ENTER to kill the "
502 "process and finish running the testcase...")
504 # first signal that we want to stop the pump thread, then wake it up
505 if hasattr(cls, 'pump_thread_stop_flag'):
506 cls.pump_thread_stop_flag.set()
507 if hasattr(cls, 'pump_thread_wakeup_pipe'):
508 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
509 if hasattr(cls, 'pump_thread'):
510 cls.logger.debug("Waiting for pump thread to stop")
511 cls.pump_thread.join()
512 if hasattr(cls, 'vpp_stderr_reader_thread'):
513 cls.logger.debug("Waiting for stdderr pump to stop")
514 cls.vpp_stderr_reader_thread.join()
516 if hasattr(cls, 'vpp'):
517 if hasattr(cls, 'vapi'):
518 cls.logger.debug("Disconnecting class vapi client on %s",
520 cls.vapi.disconnect()
521 cls.logger.debug("Deleting class vapi attribute on %s",
525 if cls.vpp.returncode is None:
526 cls.logger.debug("Sending TERM to vpp")
528 cls.logger.debug("Waiting for vpp to die")
529 cls.vpp.communicate()
530 cls.logger.debug("Deleting class vpp attribute on %s",
534 if cls.vpp_startup_failed:
535 stdout_log = cls.logger.info
536 stderr_log = cls.logger.critical
538 stdout_log = cls.logger.info
539 stderr_log = cls.logger.info
541 if hasattr(cls, 'vpp_stdout_deque'):
542 stdout_log(single_line_delim)
543 stdout_log('VPP output to stdout while running %s:', cls.__name__)
544 stdout_log(single_line_delim)
545 vpp_output = "".join(cls.vpp_stdout_deque)
546 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
548 stdout_log('\n%s', vpp_output)
549 stdout_log(single_line_delim)
551 if hasattr(cls, 'vpp_stderr_deque'):
552 stderr_log(single_line_delim)
553 stderr_log('VPP output to stderr while running %s:', cls.__name__)
554 stderr_log(single_line_delim)
555 vpp_output = "".join(cls.vpp_stderr_deque)
556 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
558 stderr_log('\n%s', vpp_output)
559 stderr_log(single_line_delim)
562 def tearDownClass(cls):
563 """ Perform final cleanup after running all tests in this test-case """
564 cls.reporter.send_keep_alive(cls, 'tearDownClass')
566 cls.file_handler.close()
567 cls.reset_packet_infos()
569 debug_internal.on_tear_down_class(cls)
572 """ Show various debug prints after each test """
573 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
574 (self.__class__.__name__, self._testMethodName,
575 self._testMethodDoc))
576 if not self.vpp_dead:
577 self.logger.debug(self.vapi.cli("show trace max 1000"))
578 self.logger.info(self.vapi.ppcli("show interface"))
579 self.logger.info(self.vapi.ppcli("show hardware"))
580 self.logger.info(self.statistics.set_errors_str())
581 self.logger.info(self.vapi.ppcli("show run"))
582 self.logger.info(self.vapi.ppcli("show log"))
583 self.registry.remove_vpp_config(self.logger)
584 # Save/Dump VPP api trace log
585 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
586 tmp_api_trace = "/tmp/%s" % api_trace
587 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
588 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
589 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
591 os.rename(tmp_api_trace, vpp_api_trace_log)
592 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
595 self.registry.unregister_all(self.logger)
598 """ Clear trace before running each test"""
599 super(VppTestCase, self).setUp()
600 self.reporter.send_keep_alive(self)
601 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
602 (self.__class__.__name__, self._testMethodName,
603 self._testMethodDoc))
605 raise Exception("VPP is dead when setting up the test")
606 self.sleep(.1, "during setUp")
607 self.vpp_stdout_deque.append(
608 "--- test setUp() for %s.%s(%s) starts here ---\n" %
609 (self.__class__.__name__, self._testMethodName,
610 self._testMethodDoc))
611 self.vpp_stderr_deque.append(
612 "--- test setUp() for %s.%s(%s) starts here ---\n" %
613 (self.__class__.__name__, self._testMethodName,
614 self._testMethodDoc))
615 self.vapi.cli("clear trace")
616 # store the test instance inside the test class - so that objects
617 # holding the class can access instance methods (like assertEqual)
618 type(self).test_instance = self
621 def pg_enable_capture(cls, interfaces=None):
623 Enable capture on packet-generator interfaces
625 :param interfaces: iterable interface indexes (if None,
626 use self.pg_interfaces)
629 if interfaces is None:
630 interfaces = cls.pg_interfaces
635 def register_capture(cls, cap_name):
636 """ Register a capture in the testclass """
637 # add to the list of captures with current timestamp
638 cls._captures.append((time.time(), cap_name))
639 # filter out from zombies
640 cls._zombie_captures = [(stamp, name)
641 for (stamp, name) in cls._zombie_captures
646 """ Remove any zombie captures and enable the packet generator """
647 # how long before capture is allowed to be deleted - otherwise vpp
648 # crashes - 100ms seems enough (this shouldn't be needed at all)
651 for stamp, cap_name in cls._zombie_captures:
652 wait = stamp + capture_ttl - now
654 cls.sleep(wait, "before deleting capture %s" % cap_name)
656 cls.logger.debug("Removing zombie capture %s" % cap_name)
657 cls.vapi.cli('packet-generator delete %s' % cap_name)
659 cls.vapi.cli("trace add pg-input 1000")
660 cls.vapi.cli('packet-generator enable')
661 cls._zombie_captures = cls._captures
665 def create_pg_interfaces(cls, interfaces):
667 Create packet-generator interfaces.
669 :param interfaces: iterable indexes of the interfaces.
670 :returns: List of created interfaces.
675 intf = VppPGInterface(cls, i)
676 setattr(cls, intf.name, intf)
678 cls.pg_interfaces = result
682 def create_loopback_interfaces(cls, count):
684 Create loopback interfaces.
686 :param count: number of interfaces created.
687 :returns: List of created interfaces.
689 result = [VppLoInterface(cls) for i in range(count)]
691 setattr(cls, intf.name, intf)
692 cls.lo_interfaces = result
696 def create_bvi_interfaces(cls, count):
698 Create BVI interfaces.
700 :param count: number of interfaces created.
701 :returns: List of created interfaces.
703 result = [VppBviInterface(cls) for i in range(count)]
705 setattr(cls, intf.name, intf)
706 cls.bvi_interfaces = result
710 def extend_packet(packet, size, padding=' '):
712 Extend packet to given size by padding with spaces or custom padding
713 NOTE: Currently works only when Raw layer is present.
715 :param packet: packet
716 :param size: target size
717 :param padding: padding used to extend the payload
720 packet_len = len(packet) + 4
721 extend = size - packet_len
723 num = (extend / len(padding)) + 1
724 packet[Raw].load += (padding * num)[:extend]
727 def reset_packet_infos(cls):
728 """ Reset the list of packet info objects and packet counts to zero """
729 cls._packet_infos = {}
730 cls._packet_count_for_dst_if_idx = {}
733 def create_packet_info(cls, src_if, dst_if):
735 Create packet info object containing the source and destination indexes
736 and add it to the testcase's packet info list
738 :param VppInterface src_if: source interface
739 :param VppInterface dst_if: destination interface
741 :returns: _PacketInfo object
745 info.index = len(cls._packet_infos)
746 info.src = src_if.sw_if_index
747 info.dst = dst_if.sw_if_index
748 if isinstance(dst_if, VppSubInterface):
749 dst_idx = dst_if.parent.sw_if_index
751 dst_idx = dst_if.sw_if_index
752 if dst_idx in cls._packet_count_for_dst_if_idx:
753 cls._packet_count_for_dst_if_idx[dst_idx] += 1
755 cls._packet_count_for_dst_if_idx[dst_idx] = 1
756 cls._packet_infos[info.index] = info
760 def info_to_payload(info):
762 Convert _PacketInfo object to packet payload
764 :param info: _PacketInfo object
766 :returns: string containing serialized data from packet info
768 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
772 def payload_to_info(payload, payload_field='load'):
774 Convert packet payload to _PacketInfo object
776 :param payload: packet payload
777 :type payload: <class 'scapy.packet.Raw'>
778 :param payload_field: packet fieldname of payload "load" for
779 <class 'scapy.packet.Raw'>
780 :type payload_field: str
781 :returns: _PacketInfo object containing de-serialized data from payload
784 numbers = getattr(payload, payload_field).split()
786 info.index = int(numbers[0])
787 info.src = int(numbers[1])
788 info.dst = int(numbers[2])
789 info.ip = int(numbers[3])
790 info.proto = int(numbers[4])
793 def get_next_packet_info(self, info):
795 Iterate over the packet info list stored in the testcase
796 Start iteration with first element if info is None
797 Continue based on index in info if info is specified
799 :param info: info or None
800 :returns: next info in list or None if no more infos
805 next_index = info.index + 1
806 if next_index == len(self._packet_infos):
809 return self._packet_infos[next_index]
811 def get_next_packet_info_for_interface(self, src_index, info):
813 Search the packet info list for the next packet info with same source
816 :param src_index: source interface index to search for
817 :param info: packet info - where to start the search
818 :returns: packet info or None
822 info = self.get_next_packet_info(info)
825 if info.src == src_index:
828 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
830 Search the packet info list for the next packet info with same source
831 and destination interface indexes
833 :param src_index: source interface index to search for
834 :param dst_index: destination interface index to search for
835 :param info: packet info - where to start the search
836 :returns: packet info or None
840 info = self.get_next_packet_info_for_interface(src_index, info)
843 if info.dst == dst_index:
846 def assert_equal(self, real_value, expected_value, name_or_class=None):
847 if name_or_class is None:
848 self.assertEqual(real_value, expected_value)
851 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
852 msg = msg % (getdoc(name_or_class).strip(),
853 real_value, str(name_or_class(real_value)),
854 expected_value, str(name_or_class(expected_value)))
856 msg = "Invalid %s: %s does not match expected value %s" % (
857 name_or_class, real_value, expected_value)
859 self.assertEqual(real_value, expected_value, msg)
861 def assert_in_range(self,
869 msg = "Invalid %s: %s out of range <%s,%s>" % (
870 name, real_value, expected_min, expected_max)
871 self.assertTrue(expected_min <= real_value <= expected_max, msg)
873 def assert_packet_checksums_valid(self, packet,
874 ignore_zero_udp_checksums=True):
875 received = packet.__class__(scapy.compat.raw(packet))
877 ppp("Verifying packet checksums for packet:", received))
878 udp_layers = ['UDP', 'UDPerror']
879 checksum_fields = ['cksum', 'chksum']
882 temp = received.__class__(scapy.compat.raw(received))
884 layer = temp.getlayer(counter)
886 for cf in checksum_fields:
887 if hasattr(layer, cf):
888 if ignore_zero_udp_checksums and \
889 0 == getattr(layer, cf) and \
890 layer.name in udp_layers:
893 checksums.append((counter, cf))
896 counter = counter + 1
897 if 0 == len(checksums):
899 temp = temp.__class__(scapy.compat.raw(temp))
900 for layer, cf in checksums:
901 calc_sum = getattr(temp[layer], cf)
903 getattr(received[layer], cf), calc_sum,
904 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
906 "Checksum field `%s` on `%s` layer has correct value `%s`" %
907 (cf, temp[layer].name, calc_sum))
909 def assert_checksum_valid(self, received_packet, layer,
911 ignore_zero_checksum=False):
912 """ Check checksum of received packet on given layer """
913 received_packet_checksum = getattr(received_packet[layer], field_name)
914 if ignore_zero_checksum and 0 == received_packet_checksum:
916 recalculated = received_packet.__class__(
917 scapy.compat.raw(received_packet))
918 delattr(recalculated[layer], field_name)
919 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
920 self.assert_equal(received_packet_checksum,
921 getattr(recalculated[layer], field_name),
922 "packet checksum on layer: %s" % layer)
924 def assert_ip_checksum_valid(self, received_packet,
925 ignore_zero_checksum=False):
926 self.assert_checksum_valid(received_packet, 'IP',
927 ignore_zero_checksum=ignore_zero_checksum)
929 def assert_tcp_checksum_valid(self, received_packet,
930 ignore_zero_checksum=False):
931 self.assert_checksum_valid(received_packet, 'TCP',
932 ignore_zero_checksum=ignore_zero_checksum)
934 def assert_udp_checksum_valid(self, received_packet,
935 ignore_zero_checksum=True):
936 self.assert_checksum_valid(received_packet, 'UDP',
937 ignore_zero_checksum=ignore_zero_checksum)
939 def assert_embedded_icmp_checksum_valid(self, received_packet):
940 if received_packet.haslayer(IPerror):
941 self.assert_checksum_valid(received_packet, 'IPerror')
942 if received_packet.haslayer(TCPerror):
943 self.assert_checksum_valid(received_packet, 'TCPerror')
944 if received_packet.haslayer(UDPerror):
945 self.assert_checksum_valid(received_packet, 'UDPerror',
946 ignore_zero_checksum=True)
947 if received_packet.haslayer(ICMPerror):
948 self.assert_checksum_valid(received_packet, 'ICMPerror')
950 def assert_icmp_checksum_valid(self, received_packet):
951 self.assert_checksum_valid(received_packet, 'ICMP')
952 self.assert_embedded_icmp_checksum_valid(received_packet)
954 def assert_icmpv6_checksum_valid(self, pkt):
955 if pkt.haslayer(ICMPv6DestUnreach):
956 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
957 self.assert_embedded_icmp_checksum_valid(pkt)
958 if pkt.haslayer(ICMPv6EchoRequest):
959 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
960 if pkt.haslayer(ICMPv6EchoReply):
961 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
963 def assert_packet_counter_equal(self, counter, expected_value):
964 if counter.startswith("/"):
965 counter_value = self.statistics.get_counter(counter)
966 self.assert_equal(counter_value, expected_value,
967 "packet counter `%s'" % counter)
969 counters = self.vapi.cli("sh errors").split('\n')
971 for i in range(1, len(counters) - 1):
972 results = counters[i].split()
973 if results[1] == counter:
974 counter_value = int(results[0])
978 def sleep(cls, timeout, remark=None):
980 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
981 # * by Guido, only the main thread can be interrupted.
983 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
986 if hasattr(os, 'sched_yield'):
992 if hasattr(cls, 'logger'):
993 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
997 if hasattr(cls, 'logger') and after - before > 2 * timeout:
998 cls.logger.error("unexpected self.sleep() result - "
999 "slept for %es instead of ~%es!",
1000 after - before, timeout)
1001 if hasattr(cls, 'logger'):
1003 "Finished sleep (%s) - slept %es (wanted %es)",
1004 remark, after - before, timeout)
1006 def pg_send(self, intf, pkts):
1007 self.vapi.cli("clear trace")
1008 intf.add_stream(pkts)
1009 self.pg_enable_capture(self.pg_interfaces)
1012 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
1013 self.pg_send(intf, pkts)
1016 for i in self.pg_interfaces:
1017 i.get_capture(0, timeout=timeout)
1018 i.assert_nothing_captured(remark=remark)
1021 def send_and_expect(self, intf, pkts, output, n_rx=None):
1024 self.pg_send(intf, pkts)
1025 rx = output.get_capture(n_rx)
1028 def send_and_expect_only(self, intf, pkts, output, timeout=None):
1029 self.pg_send(intf, pkts)
1030 rx = output.get_capture(len(pkts))
1034 for i in self.pg_interfaces:
1035 if i not in outputs:
1036 i.get_capture(0, timeout=timeout)
1037 i.assert_nothing_captured()
1043 """ unittest calls runTest when TestCase is instantiated without a
1044 test case. Use case: Writing unittests against VppTestCase"""
1048 def get_testcase_doc_name(test):
1049 return getdoc(test.__class__).splitlines()[0]
1052 def get_test_description(descriptions, test):
1053 short_description = test.shortDescription()
1054 if descriptions and short_description:
1055 return short_description
1060 class TestCaseInfo(object):
1061 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1062 self.logger = logger
1063 self.tempdir = tempdir
1064 self.vpp_pid = vpp_pid
1065 self.vpp_bin_path = vpp_bin_path
1066 self.core_crash_test = None
1069 class VppTestResult(unittest.TestResult):
1071 @property result_string
1072 String variable to store the test case result string.
1074 List variable containing 2-tuples of TestCase instances and strings
1075 holding formatted tracebacks. Each tuple represents a test which
1076 raised an unexpected exception.
1078 List variable containing 2-tuples of TestCase instances and strings
1079 holding formatted tracebacks. Each tuple represents a test where
1080 a failure was explicitly signalled using the TestCase.assert*()
1084 failed_test_cases_info = set()
1085 core_crash_test_cases_info = set()
1086 current_test_case_info = None
1088 def __init__(self, stream=None, descriptions=None, verbosity=None,
1091 :param stream File descriptor to store where to report test results.
1092 Set to the standard error stream by default.
1093 :param descriptions Boolean variable to store information if to use
1094 test case descriptions.
1095 :param verbosity Integer variable to store required verbosity level.
1097 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1098 self.stream = stream
1099 self.descriptions = descriptions
1100 self.verbosity = verbosity
1101 self.result_string = None
1102 self.runner = runner
1104 def addSuccess(self, test):
1106 Record a test succeeded result
1111 if self.current_test_case_info:
1112 self.current_test_case_info.logger.debug(
1113 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1114 test._testMethodName,
1115 test._testMethodDoc))
1116 unittest.TestResult.addSuccess(self, test)
1117 self.result_string = colorize("OK", GREEN)
1119 self.send_result_through_pipe(test, PASS)
1121 def addSkip(self, test, reason):
1123 Record a test skipped.
1129 if self.current_test_case_info:
1130 self.current_test_case_info.logger.debug(
1131 "--- addSkip() %s.%s(%s) called, reason is %s" %
1132 (test.__class__.__name__, test._testMethodName,
1133 test._testMethodDoc, reason))
1134 unittest.TestResult.addSkip(self, test, reason)
1135 self.result_string = colorize("SKIP", YELLOW)
1137 self.send_result_through_pipe(test, SKIP)
1139 def symlink_failed(self):
1140 if self.current_test_case_info:
1142 failed_dir = os.getenv('FAILED_DIR')
1143 link_path = os.path.join(
1146 os.path.basename(self.current_test_case_info.tempdir))
1147 if self.current_test_case_info.logger:
1148 self.current_test_case_info.logger.debug(
1149 "creating a link to the failed test")
1150 self.current_test_case_info.logger.debug(
1151 "os.symlink(%s, %s)" %
1152 (self.current_test_case_info.tempdir, link_path))
1153 if os.path.exists(link_path):
1154 if self.current_test_case_info.logger:
1155 self.current_test_case_info.logger.debug(
1156 'symlink already exists')
1158 os.symlink(self.current_test_case_info.tempdir, link_path)
1160 except Exception as e:
1161 if self.current_test_case_info.logger:
1162 self.current_test_case_info.logger.error(e)
1164 def send_result_through_pipe(self, test, result):
1165 if hasattr(self, 'test_framework_result_pipe'):
1166 pipe = self.test_framework_result_pipe
1168 pipe.send((test.id(), result))
1170 def log_error(self, test, err, fn_name):
1171 if self.current_test_case_info:
1172 if isinstance(test, unittest.suite._ErrorHolder):
1173 test_name = test.description
1175 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1176 test._testMethodName,
1177 test._testMethodDoc)
1178 self.current_test_case_info.logger.debug(
1179 "--- %s() %s called, err is %s" %
1180 (fn_name, test_name, err))
1181 self.current_test_case_info.logger.debug(
1182 "formatted exception is:\n%s" %
1183 "".join(format_exception(*err)))
1185 def add_error(self, test, err, unittest_fn, error_type):
1186 if error_type == FAIL:
1187 self.log_error(test, err, 'addFailure')
1188 error_type_str = colorize("FAIL", RED)
1189 elif error_type == ERROR:
1190 self.log_error(test, err, 'addError')
1191 error_type_str = colorize("ERROR", RED)
1193 raise Exception('Error type %s cannot be used to record an '
1194 'error or a failure' % error_type)
1196 unittest_fn(self, test, err)
1197 if self.current_test_case_info:
1198 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1200 self.current_test_case_info.tempdir)
1201 self.symlink_failed()
1202 self.failed_test_cases_info.add(self.current_test_case_info)
1203 if is_core_present(self.current_test_case_info.tempdir):
1204 if not self.current_test_case_info.core_crash_test:
1205 if isinstance(test, unittest.suite._ErrorHolder):
1206 test_name = str(test)
1208 test_name = "'{!s}' ({!s})".format(
1209 get_testcase_doc_name(test), test.id())
1210 self.current_test_case_info.core_crash_test = test_name
1211 self.core_crash_test_cases_info.add(
1212 self.current_test_case_info)
1214 self.result_string = '%s [no temp dir]' % error_type_str
1216 self.send_result_through_pipe(test, error_type)
1218 def addFailure(self, test, err):
1220 Record a test failed result
1223 :param err: error message
1226 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1228 def addError(self, test, err):
1230 Record a test error result
1233 :param err: error message
1236 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1238 def getDescription(self, test):
1240 Get test description
1243 :returns: test description
1246 return get_test_description(self.descriptions, test)
1248 def startTest(self, test):
1256 def print_header(test):
1257 if not hasattr(test.__class__, '_header_printed'):
1258 print(double_line_delim)
1259 print(colorize(getdoc(test).splitlines()[0], GREEN))
1260 print(double_line_delim)
1261 test.__class__._header_printed = True
1265 unittest.TestResult.startTest(self, test)
1266 if self.verbosity > 0:
1267 self.stream.writeln(
1268 "Starting " + self.getDescription(test) + " ...")
1269 self.stream.writeln(single_line_delim)
1271 def stopTest(self, test):
1273 Called when the given test has been run
1278 unittest.TestResult.stopTest(self, test)
1279 if self.verbosity > 0:
1280 self.stream.writeln(single_line_delim)
1281 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1282 self.result_string))
1283 self.stream.writeln(single_line_delim)
1285 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1286 self.result_string))
1288 self.send_result_through_pipe(test, TEST_RUN)
1290 def printErrors(self):
1292 Print errors from running the test case
1294 if len(self.errors) > 0 or len(self.failures) > 0:
1295 self.stream.writeln()
1296 self.printErrorList('ERROR', self.errors)
1297 self.printErrorList('FAIL', self.failures)
1299 # ^^ that is the last output from unittest before summary
1300 if not self.runner.print_summary:
1301 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1302 self.stream = devnull
1303 self.runner.stream = devnull
1305 def printErrorList(self, flavour, errors):
1307 Print error list to the output stream together with error type
1308 and test case description.
1310 :param flavour: error type
1311 :param errors: iterable errors
1314 for test, err in errors:
1315 self.stream.writeln(double_line_delim)
1316 self.stream.writeln("%s: %s" %
1317 (flavour, self.getDescription(test)))
1318 self.stream.writeln(single_line_delim)
1319 self.stream.writeln("%s" % err)
1322 class VppTestRunner(unittest.TextTestRunner):
1324 A basic test runner implementation which prints results to standard error.
1328 def resultclass(self):
1329 """Class maintaining the results of the tests"""
1330 return VppTestResult
1332 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1333 result_pipe=None, failfast=False, buffer=False,
1334 resultclass=None, print_summary=True, **kwargs):
1335 # ignore stream setting here, use hard-coded stdout to be in sync
1336 # with prints from VppTestCase methods ...
1337 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1338 verbosity, failfast, buffer,
1339 resultclass, **kwargs)
1340 KeepAliveReporter.pipe = keep_alive_pipe
1342 self.orig_stream = self.stream
1343 self.resultclass.test_framework_result_pipe = result_pipe
1345 self.print_summary = print_summary
1347 def _makeResult(self):
1348 return self.resultclass(self.stream,
1353 def run(self, test):
1360 faulthandler.enable() # emit stack trace to stderr if killed by signal
1362 result = super(VppTestRunner, self).run(test)
1363 if not self.print_summary:
1364 self.stream = self.orig_stream
1365 result.stream = self.orig_stream
1369 class Worker(Thread):
1370 def __init__(self, args, logger, env={}):
1371 self.logger = logger
1374 self.env = copy.deepcopy(env)
1375 super(Worker, self).__init__()
1378 executable = self.args[0]
1379 self.logger.debug("Running executable w/args `%s'" % self.args)
1380 env = os.environ.copy()
1381 env.update(self.env)
1382 env["CK_LOG_FILE_NAME"] = "-"
1383 self.process = subprocess.Popen(
1384 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1385 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1386 out, err = self.process.communicate()
1387 self.logger.debug("Finished running `%s'" % executable)
1388 self.logger.info("Return code is `%s'" % self.process.returncode)
1389 self.logger.info(single_line_delim)
1390 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1391 self.logger.info(single_line_delim)
1392 self.logger.info(out)
1393 self.logger.info(single_line_delim)
1394 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1395 self.logger.info(single_line_delim)
1396 self.logger.info(err)
1397 self.logger.info(single_line_delim)
1398 self.result = self.process.returncode
1400 if __name__ == '__main__':