3 from __future__ import print_function
16 from collections import deque
17 from threading import Thread, Event
18 from inspect import getdoc, isclass
19 from traceback import format_exception
20 from logging import FileHandler, DEBUG, Formatter
23 from scapy.packet import Raw
24 from hook import StepHook, PollHook, VppDiedError
25 from vpp_pg_interface import VppPGInterface
26 from vpp_sub_interface import VppSubInterface
27 from vpp_lo_interface import VppLoInterface
28 from vpp_bvi_interface import VppBviInterface
29 from vpp_papi_provider import VppPapiProvider
30 from vpp_papi.vpp_stats import VPPStats
31 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
33 from vpp_object import VppObjectRegistry
34 from util import ppp, is_core_present
35 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
36 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
37 from scapy.layers.inet6 import ICMPv6EchoReply
39 if os.name == 'posix' and sys.version_info[0] < 3:
40 # using subprocess32 is recommended by python official documentation
41 # @ https://docs.python.org/2/library/subprocess.html
42 import subprocess32 as subprocess
46 # Python2/3 compatible
58 debug_framework = False
59 if os.getenv('TEST_DEBUG', "0") == "1":
60 debug_framework = True
64 Test framework module.
66 The module provides a set of tools for constructing and running tests and
67 representing the results.
71 class _PacketInfo(object):
72 """Private class to create packet info object.
74 Help process information about the next packet.
75 Set variables to default values.
77 #: Store the index of the packet.
79 #: Store the index of the source packet generator interface of the packet.
81 #: Store the index of the destination packet generator interface
84 #: Store expected ip version
86 #: Store expected upper protocol
88 #: Store the copy of the former packet.
91 def __eq__(self, other):
92 index = self.index == other.index
93 src = self.src == other.src
94 dst = self.dst == other.dst
95 data = self.data == other.data
96 return index and src and dst and data
99 def pump_output(testclass):
100 """ pump output from vpp stdout/stderr to proper queues """
103 while not testclass.pump_thread_stop_flag.is_set():
104 readable = select.select([testclass.vpp.stdout.fileno(),
105 testclass.vpp.stderr.fileno(),
106 testclass.pump_thread_wakeup_pipe[0]],
108 if testclass.vpp.stdout.fileno() in readable:
109 read = os.read(testclass.vpp.stdout.fileno(), 102400)
111 split = read.splitlines(True)
112 if len(stdout_fragment) > 0:
113 split[0] = "%s%s" % (stdout_fragment, split[0])
114 if len(split) > 0 and split[-1].endswith("\n"):
118 stdout_fragment = split[-1]
119 testclass.vpp_stdout_deque.extend(split[:limit])
120 if not testclass.cache_vpp_output:
121 for line in split[:limit]:
122 testclass.logger.debug(
123 "VPP STDOUT: %s" % line.rstrip("\n"))
124 if testclass.vpp.stderr.fileno() in readable:
125 read = os.read(testclass.vpp.stderr.fileno(), 102400)
127 split = read.splitlines(True)
128 if len(stderr_fragment) > 0:
129 split[0] = "%s%s" % (stderr_fragment, split[0])
130 if len(split) > 0 and split[-1].endswith(b"\n"):
134 stderr_fragment = split[-1]
135 testclass.vpp_stderr_deque.extend(split[:limit])
136 if not testclass.cache_vpp_output:
137 for line in split[:limit]:
138 testclass.logger.debug(
139 "VPP STDERR: %s" % line.rstrip("\n"))
140 # ignoring the dummy pipe here intentionally - the
141 # flag will take care of properly terminating the loop
144 def _is_skip_aarch64_set():
145 return os.getenv('SKIP_AARCH64', 'n').lower() in ('yes', 'y', '1')
147 is_skip_aarch64_set = _is_skip_aarch64_set()
150 def _is_platform_aarch64():
151 return platform.machine() == 'aarch64'
153 is_platform_aarch64 = _is_platform_aarch64()
156 def _running_extended_tests():
157 s = os.getenv("EXTENDED_TESTS", "n")
158 return True if s.lower() in ("y", "yes", "1") else False
160 running_extended_tests = _running_extended_tests()
163 def _running_on_centos():
164 os_id = os.getenv("OS_ID", "")
165 return True if "centos" in os_id.lower() else False
167 running_on_centos = _running_on_centos
170 class KeepAliveReporter(object):
172 Singleton object which reports test start to parent process
177 self.__dict__ = self._shared_state
185 def pipe(self, pipe):
186 if self._pipe is not None:
187 raise Exception("Internal error - pipe should only be set once.")
190 def send_keep_alive(self, test, desc=None):
192 Write current test tmpdir & desc to keep-alive pipe to signal liveness
194 if self.pipe is None:
195 # if not running forked..
199 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
203 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
206 class VppTestCase(unittest.TestCase):
207 """This subclass is a base class for VPP test cases that are implemented as
208 classes. It provides methods to create and run test case.
211 extra_vpp_punt_config = []
212 extra_vpp_plugin_config = []
215 def packet_infos(self):
216 """List of packet infos"""
217 return self._packet_infos
220 def get_packet_count_for_if_idx(cls, dst_if_index):
221 """Get the number of packet info for specified destination if index"""
222 if dst_if_index in cls._packet_count_for_dst_if_idx:
223 return cls._packet_count_for_dst_if_idx[dst_if_index]
229 """Return the instance of this testcase"""
230 return cls.test_instance
233 def set_debug_flags(cls, d):
234 cls.debug_core = False
235 cls.debug_gdb = False
236 cls.debug_gdbserver = False
241 cls.debug_core = True
244 elif dl == "gdbserver":
245 cls.debug_gdbserver = True
247 raise Exception("Unrecognized DEBUG option: '%s'" % d)
250 def get_least_used_cpu():
251 cpu_usage_list = [set(range(psutil.cpu_count()))]
252 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
253 if 'vpp_main' == p.info['name']]
254 for vpp_process in vpp_processes:
255 for cpu_usage_set in cpu_usage_list:
257 cpu_num = vpp_process.cpu_num()
258 if cpu_num in cpu_usage_set:
259 cpu_usage_set_index = cpu_usage_list.index(
261 if cpu_usage_set_index == len(cpu_usage_list) - 1:
262 cpu_usage_list.append({cpu_num})
264 cpu_usage_list[cpu_usage_set_index + 1].add(
266 cpu_usage_set.remove(cpu_num)
268 except psutil.NoSuchProcess:
271 for cpu_usage_set in cpu_usage_list:
272 if len(cpu_usage_set) > 0:
273 min_usage_set = cpu_usage_set
276 return random.choice(tuple(min_usage_set))
279 def setUpConstants(cls):
280 """ Set-up the test case class based on environment variables """
281 s = os.getenv("STEP", "n")
282 cls.step = True if s.lower() in ("y", "yes", "1") else False
283 d = os.getenv("DEBUG", None)
284 c = os.getenv("CACHE_OUTPUT", "1")
285 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
286 cls.set_debug_flags(d)
287 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
288 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
289 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
291 if cls.plugin_path is not None:
292 if cls.extern_plugin_path is not None:
293 plugin_path = "%s:%s" % (
294 cls.plugin_path, cls.extern_plugin_path)
296 plugin_path = cls.plugin_path
297 elif cls.extern_plugin_path is not None:
298 plugin_path = cls.extern_plugin_path
300 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
301 debug_cli = "cli-listen localhost:5002"
303 size = os.getenv("COREDUMP_SIZE")
305 coredump_size = "coredump-size %s" % size
306 if coredump_size is None:
307 coredump_size = "coredump-size unlimited"
309 cpu_core_number = cls.get_least_used_cpu()
311 cls.vpp_cmdline = [cls.vpp_bin, "unix",
312 "{", "nodaemon", debug_cli, "full-coredump",
313 coredump_size, "runtime-dir", cls.tempdir, "}",
314 "api-trace", "{", "on", "}", "api-segment", "{",
315 "prefix", cls.shm_prefix, "}", "cpu", "{",
316 "main-core", str(cpu_core_number), "}", "statseg",
317 "{", "socket-name", cls.stats_sock, "}", "plugins",
318 "{", "plugin", "dpdk_plugin.so", "{", "disable",
319 "}", "plugin", "rdma_plugin.so", "{", "disable",
320 "}", "plugin", "unittest_plugin.so", "{", "enable",
321 "}"] + cls.extra_vpp_plugin_config + ["}", ]
322 if cls.extra_vpp_punt_config is not None:
323 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
324 if plugin_path is not None:
325 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
326 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
327 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
330 def wait_for_enter(cls):
331 if cls.debug_gdbserver:
332 print(double_line_delim)
333 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
335 print(double_line_delim)
336 print("Spawned VPP with PID: %d" % cls.vpp.pid)
338 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
340 print(single_line_delim)
341 print("You can debug the VPP using e.g.:")
342 if cls.debug_gdbserver:
343 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
344 print("Now is the time to attach a gdb by running the above "
345 "command, set up breakpoints etc. and then resume VPP from "
346 "within gdb by issuing the 'continue' command")
348 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
349 print("Now is the time to attach a gdb by running the above "
350 "command and set up breakpoints etc.")
351 print(single_line_delim)
352 input("Press ENTER to continue running the testcase...")
356 cmdline = cls.vpp_cmdline
358 if cls.debug_gdbserver:
359 gdbserver = '/usr/bin/gdbserver'
360 if not os.path.isfile(gdbserver) or \
361 not os.access(gdbserver, os.X_OK):
362 raise Exception("gdbserver binary '%s' does not exist or is "
363 "not executable" % gdbserver)
365 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
366 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
369 cls.vpp = subprocess.Popen(cmdline,
370 stdout=subprocess.PIPE,
371 stderr=subprocess.PIPE,
373 except subprocess.CalledProcessError as e:
374 cls.logger.critical("Subprocess returned with non-0 return code: ("
378 cls.logger.critical("Subprocess returned with OS error: "
379 "(%s) %s", e.errno, e.strerror)
381 except Exception as e:
382 cls.logger.exception("Subprocess returned unexpected from "
389 def wait_for_stats_socket(cls):
390 deadline = time.time() + 3
392 while time.time() < deadline or \
393 cls.debug_gdb or cls.debug_gdbserver:
394 if os.path.exists(cls.stats_sock):
399 cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
404 Perform class setup before running the testcase
405 Remove shared memory files, start vpp and connect the vpp-api
407 super(VppTestCase, cls).setUpClass()
408 gc.collect() # run garbage collection first
410 cls.logger = get_logger(cls.__name__)
411 if hasattr(cls, 'parallel_handler'):
412 cls.logger.addHandler(cls.parallel_handler)
413 cls.logger.propagate = False
415 cls.tempdir = tempfile.mkdtemp(
416 prefix='vpp-unittest-%s-' % cls.__name__)
417 cls.stats_sock = "%s/stats.sock" % cls.tempdir
418 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
419 cls.file_handler.setFormatter(
420 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
422 cls.file_handler.setLevel(DEBUG)
423 cls.logger.addHandler(cls.file_handler)
424 cls.logger.debug("--- setUpClass() for %s called ---" %
426 cls.shm_prefix = os.path.basename(cls.tempdir)
427 os.chdir(cls.tempdir)
428 cls.logger.info("Temporary dir is %s, shm prefix is %s",
429 cls.tempdir, cls.shm_prefix)
431 cls.reset_packet_infos()
433 cls._zombie_captures = []
436 cls.registry = VppObjectRegistry()
437 cls.vpp_startup_failed = False
438 cls.reporter = KeepAliveReporter()
439 # need to catch exceptions here because if we raise, then the cleanup
440 # doesn't get called and we might end with a zombie vpp
443 cls.reporter.send_keep_alive(cls, 'setUpClass')
444 VppTestResult.current_test_case_info = TestCaseInfo(
445 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
446 cls.vpp_stdout_deque = deque()
447 cls.vpp_stderr_deque = deque()
448 cls.pump_thread_stop_flag = Event()
449 cls.pump_thread_wakeup_pipe = os.pipe()
450 cls.pump_thread = Thread(target=pump_output, args=(cls,))
451 cls.pump_thread.daemon = True
452 cls.pump_thread.start()
453 if cls.debug_gdb or cls.debug_gdbserver:
457 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
463 cls.vapi.register_hook(hook)
464 cls.wait_for_stats_socket()
465 cls.statistics = VPPStats(socketname=cls.stats_sock)
469 cls.vpp_startup_failed = True
471 "VPP died shortly after startup, check the"
472 " output to standard error for possible cause")
478 cls.vapi.disconnect()
481 if cls.debug_gdbserver:
482 print(colorize("You're running VPP inside gdbserver but "
483 "VPP-API connection failed, did you forget "
484 "to 'continue' VPP from within gdb?", RED))
496 Disconnect vpp-api, kill vpp and cleanup shared memory files
498 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
500 if cls.vpp.returncode is None:
501 print(double_line_delim)
502 print("VPP or GDB server is still running")
503 print(single_line_delim)
504 input("When done debugging, press ENTER to kill the "
505 "process and finish running the testcase...")
507 # first signal that we want to stop the pump thread, then wake it up
508 if hasattr(cls, 'pump_thread_stop_flag'):
509 cls.pump_thread_stop_flag.set()
510 if hasattr(cls, 'pump_thread_wakeup_pipe'):
511 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
512 if hasattr(cls, 'pump_thread'):
513 cls.logger.debug("Waiting for pump thread to stop")
514 cls.pump_thread.join()
515 if hasattr(cls, 'vpp_stderr_reader_thread'):
516 cls.logger.debug("Waiting for stdderr pump to stop")
517 cls.vpp_stderr_reader_thread.join()
519 if hasattr(cls, 'vpp'):
520 if hasattr(cls, 'vapi'):
521 cls.logger.debug("Disconnecting class vapi client on %s",
523 cls.vapi.disconnect()
524 cls.logger.debug("Deleting class vapi attribute on %s",
528 if cls.vpp.returncode is None:
529 cls.logger.debug("Sending TERM to vpp")
531 cls.logger.debug("Waiting for vpp to die")
532 cls.vpp.communicate()
533 cls.logger.debug("Deleting class vpp attribute on %s",
537 if cls.vpp_startup_failed:
538 stdout_log = cls.logger.info
539 stderr_log = cls.logger.critical
541 stdout_log = cls.logger.info
542 stderr_log = cls.logger.info
544 if hasattr(cls, 'vpp_stdout_deque'):
545 stdout_log(single_line_delim)
546 stdout_log('VPP output to stdout while running %s:', cls.__name__)
547 stdout_log(single_line_delim)
548 vpp_output = "".join(cls.vpp_stdout_deque)
549 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
551 stdout_log('\n%s', vpp_output)
552 stdout_log(single_line_delim)
554 if hasattr(cls, 'vpp_stderr_deque'):
555 stderr_log(single_line_delim)
556 stderr_log('VPP output to stderr while running %s:', cls.__name__)
557 stderr_log(single_line_delim)
558 vpp_output = "".join(cls.vpp_stderr_deque)
559 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
561 stderr_log('\n%s', vpp_output)
562 stderr_log(single_line_delim)
565 def tearDownClass(cls):
566 """ Perform final cleanup after running all tests in this test-case """
567 cls.logger.debug("--- tearDownClass() for %s called ---" %
569 cls.reporter.send_keep_alive(cls, 'tearDownClass')
571 cls.file_handler.close()
572 cls.reset_packet_infos()
574 debug_internal.on_tear_down_class(cls)
576 def show_commands_at_teardown(self):
577 """ Allow subclass specific teardown logging additions."""
578 self.logger.info("--- No test specific show commands provided. ---")
581 """ Show various debug prints after each test """
582 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
583 (self.__class__.__name__, self._testMethodName,
584 self._testMethodDoc))
585 if not self.vpp_dead:
587 "--- Logging show commands common to all testcases. ---")
588 self.logger.debug(self.vapi.cli("show trace max 1000"))
589 self.logger.info(self.vapi.ppcli("show interface"))
590 self.logger.info(self.vapi.ppcli("show hardware"))
591 self.logger.info(self.statistics.set_errors_str())
592 self.logger.info(self.vapi.ppcli("show run"))
593 self.logger.info(self.vapi.ppcli("show log"))
594 self.logger.info("Logging testcase specific show commands.")
595 self.show_commands_at_teardown()
596 self.registry.remove_vpp_config(self.logger)
597 # Save/Dump VPP api trace log
598 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
599 tmp_api_trace = "/tmp/%s" % api_trace
600 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
601 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
602 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
604 os.rename(tmp_api_trace, vpp_api_trace_log)
605 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
608 self.registry.unregister_all(self.logger)
611 """ Clear trace before running each test"""
612 super(VppTestCase, self).setUp()
613 self.reporter.send_keep_alive(self)
615 raise Exception("VPP is dead when setting up the test")
616 self.sleep(.1, "during setUp")
617 self.vpp_stdout_deque.append(
618 "--- test setUp() for %s.%s(%s) starts here ---\n" %
619 (self.__class__.__name__, self._testMethodName,
620 self._testMethodDoc))
621 self.vpp_stderr_deque.append(
622 "--- test setUp() for %s.%s(%s) starts here ---\n" %
623 (self.__class__.__name__, self._testMethodName,
624 self._testMethodDoc))
625 self.vapi.cli("clear trace")
626 # store the test instance inside the test class - so that objects
627 # holding the class can access instance methods (like assertEqual)
628 type(self).test_instance = self
631 def pg_enable_capture(cls, interfaces=None):
633 Enable capture on packet-generator interfaces
635 :param interfaces: iterable interface indexes (if None,
636 use self.pg_interfaces)
639 if interfaces is None:
640 interfaces = cls.pg_interfaces
645 def register_capture(cls, cap_name):
646 """ Register a capture in the testclass """
647 # add to the list of captures with current timestamp
648 cls._captures.append((time.time(), cap_name))
649 # filter out from zombies
650 cls._zombie_captures = [(stamp, name)
651 for (stamp, name) in cls._zombie_captures
656 """ Remove any zombie captures and enable the packet generator """
657 # how long before capture is allowed to be deleted - otherwise vpp
658 # crashes - 100ms seems enough (this shouldn't be needed at all)
661 for stamp, cap_name in cls._zombie_captures:
662 wait = stamp + capture_ttl - now
664 cls.sleep(wait, "before deleting capture %s" % cap_name)
666 cls.logger.debug("Removing zombie capture %s" % cap_name)
667 cls.vapi.cli('packet-generator delete %s' % cap_name)
669 cls.vapi.cli("trace add pg-input 1000")
670 cls.vapi.cli('packet-generator enable')
671 cls._zombie_captures = cls._captures
675 def create_pg_interfaces(cls, interfaces):
677 Create packet-generator interfaces.
679 :param interfaces: iterable indexes of the interfaces.
680 :returns: List of created interfaces.
685 intf = VppPGInterface(cls, i)
686 setattr(cls, intf.name, intf)
688 cls.pg_interfaces = result
692 def create_loopback_interfaces(cls, count):
694 Create loopback interfaces.
696 :param count: number of interfaces created.
697 :returns: List of created interfaces.
699 result = [VppLoInterface(cls) for i in range(count)]
701 setattr(cls, intf.name, intf)
702 cls.lo_interfaces = result
706 def create_bvi_interfaces(cls, count):
708 Create BVI interfaces.
710 :param count: number of interfaces created.
711 :returns: List of created interfaces.
713 result = [VppBviInterface(cls) for i in range(count)]
715 setattr(cls, intf.name, intf)
716 cls.bvi_interfaces = result
720 def extend_packet(packet, size, padding=' '):
722 Extend packet to given size by padding with spaces or custom padding
723 NOTE: Currently works only when Raw layer is present.
725 :param packet: packet
726 :param size: target size
727 :param padding: padding used to extend the payload
730 packet_len = len(packet) + 4
731 extend = size - packet_len
733 num = (extend / len(padding)) + 1
734 packet[Raw].load += (padding * num)[:extend]
737 def reset_packet_infos(cls):
738 """ Reset the list of packet info objects and packet counts to zero """
739 cls._packet_infos = {}
740 cls._packet_count_for_dst_if_idx = {}
743 def create_packet_info(cls, src_if, dst_if):
745 Create packet info object containing the source and destination indexes
746 and add it to the testcase's packet info list
748 :param VppInterface src_if: source interface
749 :param VppInterface dst_if: destination interface
751 :returns: _PacketInfo object
755 info.index = len(cls._packet_infos)
756 info.src = src_if.sw_if_index
757 info.dst = dst_if.sw_if_index
758 if isinstance(dst_if, VppSubInterface):
759 dst_idx = dst_if.parent.sw_if_index
761 dst_idx = dst_if.sw_if_index
762 if dst_idx in cls._packet_count_for_dst_if_idx:
763 cls._packet_count_for_dst_if_idx[dst_idx] += 1
765 cls._packet_count_for_dst_if_idx[dst_idx] = 1
766 cls._packet_infos[info.index] = info
770 def info_to_payload(info):
772 Convert _PacketInfo object to packet payload
774 :param info: _PacketInfo object
776 :returns: string containing serialized data from packet info
778 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
782 def payload_to_info(payload, payload_field='load'):
784 Convert packet payload to _PacketInfo object
786 :param payload: packet payload
787 :type payload: <class 'scapy.packet.Raw'>
788 :param payload_field: packet fieldname of payload "load" for
789 <class 'scapy.packet.Raw'>
790 :type payload_field: str
791 :returns: _PacketInfo object containing de-serialized data from payload
794 numbers = getattr(payload, payload_field).split()
796 info.index = int(numbers[0])
797 info.src = int(numbers[1])
798 info.dst = int(numbers[2])
799 info.ip = int(numbers[3])
800 info.proto = int(numbers[4])
803 def get_next_packet_info(self, info):
805 Iterate over the packet info list stored in the testcase
806 Start iteration with first element if info is None
807 Continue based on index in info if info is specified
809 :param info: info or None
810 :returns: next info in list or None if no more infos
815 next_index = info.index + 1
816 if next_index == len(self._packet_infos):
819 return self._packet_infos[next_index]
821 def get_next_packet_info_for_interface(self, src_index, info):
823 Search the packet info list for the next packet info with same source
826 :param src_index: source interface index to search for
827 :param info: packet info - where to start the search
828 :returns: packet info or None
832 info = self.get_next_packet_info(info)
835 if info.src == src_index:
838 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
840 Search the packet info list for the next packet info with same source
841 and destination interface indexes
843 :param src_index: source interface index to search for
844 :param dst_index: destination interface index to search for
845 :param info: packet info - where to start the search
846 :returns: packet info or None
850 info = self.get_next_packet_info_for_interface(src_index, info)
853 if info.dst == dst_index:
856 def assert_equal(self, real_value, expected_value, name_or_class=None):
857 if name_or_class is None:
858 self.assertEqual(real_value, expected_value)
861 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
862 msg = msg % (getdoc(name_or_class).strip(),
863 real_value, str(name_or_class(real_value)),
864 expected_value, str(name_or_class(expected_value)))
866 msg = "Invalid %s: %s does not match expected value %s" % (
867 name_or_class, real_value, expected_value)
869 self.assertEqual(real_value, expected_value, msg)
871 def assert_in_range(self,
879 msg = "Invalid %s: %s out of range <%s,%s>" % (
880 name, real_value, expected_min, expected_max)
881 self.assertTrue(expected_min <= real_value <= expected_max, msg)
883 def assert_packet_checksums_valid(self, packet,
884 ignore_zero_udp_checksums=True):
885 received = packet.__class__(scapy.compat.raw(packet))
887 ppp("Verifying packet checksums for packet:", received))
888 udp_layers = ['UDP', 'UDPerror']
889 checksum_fields = ['cksum', 'chksum']
892 temp = received.__class__(scapy.compat.raw(received))
894 layer = temp.getlayer(counter)
896 for cf in checksum_fields:
897 if hasattr(layer, cf):
898 if ignore_zero_udp_checksums and \
899 0 == getattr(layer, cf) and \
900 layer.name in udp_layers:
903 checksums.append((counter, cf))
906 counter = counter + 1
907 if 0 == len(checksums):
909 temp = temp.__class__(scapy.compat.raw(temp))
910 for layer, cf in checksums:
911 calc_sum = getattr(temp[layer], cf)
913 getattr(received[layer], cf), calc_sum,
914 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
916 "Checksum field `%s` on `%s` layer has correct value `%s`" %
917 (cf, temp[layer].name, calc_sum))
919 def assert_checksum_valid(self, received_packet, layer,
921 ignore_zero_checksum=False):
922 """ Check checksum of received packet on given layer """
923 received_packet_checksum = getattr(received_packet[layer], field_name)
924 if ignore_zero_checksum and 0 == received_packet_checksum:
926 recalculated = received_packet.__class__(
927 scapy.compat.raw(received_packet))
928 delattr(recalculated[layer], field_name)
929 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
930 self.assert_equal(received_packet_checksum,
931 getattr(recalculated[layer], field_name),
932 "packet checksum on layer: %s" % layer)
934 def assert_ip_checksum_valid(self, received_packet,
935 ignore_zero_checksum=False):
936 self.assert_checksum_valid(received_packet, 'IP',
937 ignore_zero_checksum=ignore_zero_checksum)
939 def assert_tcp_checksum_valid(self, received_packet,
940 ignore_zero_checksum=False):
941 self.assert_checksum_valid(received_packet, 'TCP',
942 ignore_zero_checksum=ignore_zero_checksum)
944 def assert_udp_checksum_valid(self, received_packet,
945 ignore_zero_checksum=True):
946 self.assert_checksum_valid(received_packet, 'UDP',
947 ignore_zero_checksum=ignore_zero_checksum)
949 def assert_embedded_icmp_checksum_valid(self, received_packet):
950 if received_packet.haslayer(IPerror):
951 self.assert_checksum_valid(received_packet, 'IPerror')
952 if received_packet.haslayer(TCPerror):
953 self.assert_checksum_valid(received_packet, 'TCPerror')
954 if received_packet.haslayer(UDPerror):
955 self.assert_checksum_valid(received_packet, 'UDPerror',
956 ignore_zero_checksum=True)
957 if received_packet.haslayer(ICMPerror):
958 self.assert_checksum_valid(received_packet, 'ICMPerror')
960 def assert_icmp_checksum_valid(self, received_packet):
961 self.assert_checksum_valid(received_packet, 'ICMP')
962 self.assert_embedded_icmp_checksum_valid(received_packet)
964 def assert_icmpv6_checksum_valid(self, pkt):
965 if pkt.haslayer(ICMPv6DestUnreach):
966 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
967 self.assert_embedded_icmp_checksum_valid(pkt)
968 if pkt.haslayer(ICMPv6EchoRequest):
969 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
970 if pkt.haslayer(ICMPv6EchoReply):
971 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
973 def assert_packet_counter_equal(self, counter, expected_value):
974 if counter.startswith("/"):
975 counter_value = self.statistics.get_counter(counter)
976 self.assert_equal(counter_value, expected_value,
977 "packet counter `%s'" % counter)
979 counters = self.vapi.cli("sh errors").split('\n')
981 for i in range(1, len(counters) - 1):
982 results = counters[i].split()
983 if results[1] == counter:
984 counter_value = int(results[0])
988 def sleep(cls, timeout, remark=None):
990 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
991 # * by Guido, only the main thread can be interrupted.
993 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
996 if hasattr(os, 'sched_yield'):
1002 if hasattr(cls, 'logger'):
1003 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1004 before = time.time()
1007 if hasattr(cls, 'logger') and after - before > 2 * timeout:
1008 cls.logger.error("unexpected self.sleep() result - "
1009 "slept for %es instead of ~%es!",
1010 after - before, timeout)
1011 if hasattr(cls, 'logger'):
1013 "Finished sleep (%s) - slept %es (wanted %es)",
1014 remark, after - before, timeout)
1016 def pg_send(self, intf, pkts):
1017 self.vapi.cli("clear trace")
1018 intf.add_stream(pkts)
1019 self.pg_enable_capture(self.pg_interfaces)
1022 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
1023 self.pg_send(intf, pkts)
1026 for i in self.pg_interfaces:
1027 i.get_capture(0, timeout=timeout)
1028 i.assert_nothing_captured(remark=remark)
1031 def send_and_expect(self, intf, pkts, output, n_rx=None):
1034 self.pg_send(intf, pkts)
1035 rx = output.get_capture(n_rx)
1038 def send_and_expect_only(self, intf, pkts, output, timeout=None):
1039 self.pg_send(intf, pkts)
1040 rx = output.get_capture(len(pkts))
1044 for i in self.pg_interfaces:
1045 if i not in outputs:
1046 i.get_capture(0, timeout=timeout)
1047 i.assert_nothing_captured()
1053 """ unittest calls runTest when TestCase is instantiated without a
1054 test case. Use case: Writing unittests against VppTestCase"""
1058 def get_testcase_doc_name(test):
1059 return getdoc(test.__class__).splitlines()[0]
1062 def get_test_description(descriptions, test):
1063 short_description = test.shortDescription()
1064 if descriptions and short_description:
1065 return short_description
1070 class TestCaseInfo(object):
1071 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1072 self.logger = logger
1073 self.tempdir = tempdir
1074 self.vpp_pid = vpp_pid
1075 self.vpp_bin_path = vpp_bin_path
1076 self.core_crash_test = None
1079 class VppTestResult(unittest.TestResult):
1081 @property result_string
1082 String variable to store the test case result string.
1084 List variable containing 2-tuples of TestCase instances and strings
1085 holding formatted tracebacks. Each tuple represents a test which
1086 raised an unexpected exception.
1088 List variable containing 2-tuples of TestCase instances and strings
1089 holding formatted tracebacks. Each tuple represents a test where
1090 a failure was explicitly signalled using the TestCase.assert*()
1094 failed_test_cases_info = set()
1095 core_crash_test_cases_info = set()
1096 current_test_case_info = None
1098 def __init__(self, stream=None, descriptions=None, verbosity=None,
1101 :param stream File descriptor to store where to report test results.
1102 Set to the standard error stream by default.
1103 :param descriptions Boolean variable to store information if to use
1104 test case descriptions.
1105 :param verbosity Integer variable to store required verbosity level.
1107 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1108 self.stream = stream
1109 self.descriptions = descriptions
1110 self.verbosity = verbosity
1111 self.result_string = None
1112 self.runner = runner
1114 def addSuccess(self, test):
1116 Record a test succeeded result
1121 if self.current_test_case_info:
1122 self.current_test_case_info.logger.debug(
1123 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1124 test._testMethodName,
1125 test._testMethodDoc))
1126 unittest.TestResult.addSuccess(self, test)
1127 self.result_string = colorize("OK", GREEN)
1129 self.send_result_through_pipe(test, PASS)
1131 def addSkip(self, test, reason):
1133 Record a test skipped.
1139 if self.current_test_case_info:
1140 self.current_test_case_info.logger.debug(
1141 "--- addSkip() %s.%s(%s) called, reason is %s" %
1142 (test.__class__.__name__, test._testMethodName,
1143 test._testMethodDoc, reason))
1144 unittest.TestResult.addSkip(self, test, reason)
1145 self.result_string = colorize("SKIP", YELLOW)
1147 self.send_result_through_pipe(test, SKIP)
1149 def symlink_failed(self):
1150 if self.current_test_case_info:
1152 failed_dir = os.getenv('FAILED_DIR')
1153 link_path = os.path.join(
1156 os.path.basename(self.current_test_case_info.tempdir))
1157 if self.current_test_case_info.logger:
1158 self.current_test_case_info.logger.debug(
1159 "creating a link to the failed test")
1160 self.current_test_case_info.logger.debug(
1161 "os.symlink(%s, %s)" %
1162 (self.current_test_case_info.tempdir, link_path))
1163 if os.path.exists(link_path):
1164 if self.current_test_case_info.logger:
1165 self.current_test_case_info.logger.debug(
1166 'symlink already exists')
1168 os.symlink(self.current_test_case_info.tempdir, link_path)
1170 except Exception as e:
1171 if self.current_test_case_info.logger:
1172 self.current_test_case_info.logger.error(e)
1174 def send_result_through_pipe(self, test, result):
1175 if hasattr(self, 'test_framework_result_pipe'):
1176 pipe = self.test_framework_result_pipe
1178 pipe.send((test.id(), result))
1180 def log_error(self, test, err, fn_name):
1181 if self.current_test_case_info:
1182 if isinstance(test, unittest.suite._ErrorHolder):
1183 test_name = test.description
1185 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1186 test._testMethodName,
1187 test._testMethodDoc)
1188 self.current_test_case_info.logger.debug(
1189 "--- %s() %s called, err is %s" %
1190 (fn_name, test_name, err))
1191 self.current_test_case_info.logger.debug(
1192 "formatted exception is:\n%s" %
1193 "".join(format_exception(*err)))
1195 def add_error(self, test, err, unittest_fn, error_type):
1196 if error_type == FAIL:
1197 self.log_error(test, err, 'addFailure')
1198 error_type_str = colorize("FAIL", RED)
1199 elif error_type == ERROR:
1200 self.log_error(test, err, 'addError')
1201 error_type_str = colorize("ERROR", RED)
1203 raise Exception('Error type %s cannot be used to record an '
1204 'error or a failure' % error_type)
1206 unittest_fn(self, test, err)
1207 if self.current_test_case_info:
1208 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1210 self.current_test_case_info.tempdir)
1211 self.symlink_failed()
1212 self.failed_test_cases_info.add(self.current_test_case_info)
1213 if is_core_present(self.current_test_case_info.tempdir):
1214 if not self.current_test_case_info.core_crash_test:
1215 if isinstance(test, unittest.suite._ErrorHolder):
1216 test_name = str(test)
1218 test_name = "'{!s}' ({!s})".format(
1219 get_testcase_doc_name(test), test.id())
1220 self.current_test_case_info.core_crash_test = test_name
1221 self.core_crash_test_cases_info.add(
1222 self.current_test_case_info)
1224 self.result_string = '%s [no temp dir]' % error_type_str
1226 self.send_result_through_pipe(test, error_type)
1228 def addFailure(self, test, err):
1230 Record a test failed result
1233 :param err: error message
1236 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1238 def addError(self, test, err):
1240 Record a test error result
1243 :param err: error message
1246 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1248 def getDescription(self, test):
1250 Get test description
1253 :returns: test description
1256 return get_test_description(self.descriptions, test)
1258 def startTest(self, test):
1266 def print_header(test):
1267 if not hasattr(test.__class__, '_header_printed'):
1268 print(double_line_delim)
1269 print(colorize(getdoc(test).splitlines()[0], GREEN))
1270 print(double_line_delim)
1271 test.__class__._header_printed = True
1275 unittest.TestResult.startTest(self, test)
1276 if self.verbosity > 0:
1277 self.stream.writeln(
1278 "Starting " + self.getDescription(test) + " ...")
1279 self.stream.writeln(single_line_delim)
1281 def stopTest(self, test):
1283 Called when the given test has been run
1288 unittest.TestResult.stopTest(self, test)
1289 if self.verbosity > 0:
1290 self.stream.writeln(single_line_delim)
1291 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1292 self.result_string))
1293 self.stream.writeln(single_line_delim)
1295 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1296 self.result_string))
1298 self.send_result_through_pipe(test, TEST_RUN)
1300 def printErrors(self):
1302 Print errors from running the test case
1304 if len(self.errors) > 0 or len(self.failures) > 0:
1305 self.stream.writeln()
1306 self.printErrorList('ERROR', self.errors)
1307 self.printErrorList('FAIL', self.failures)
1309 # ^^ that is the last output from unittest before summary
1310 if not self.runner.print_summary:
1311 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1312 self.stream = devnull
1313 self.runner.stream = devnull
1315 def printErrorList(self, flavour, errors):
1317 Print error list to the output stream together with error type
1318 and test case description.
1320 :param flavour: error type
1321 :param errors: iterable errors
1324 for test, err in errors:
1325 self.stream.writeln(double_line_delim)
1326 self.stream.writeln("%s: %s" %
1327 (flavour, self.getDescription(test)))
1328 self.stream.writeln(single_line_delim)
1329 self.stream.writeln("%s" % err)
1332 class VppTestRunner(unittest.TextTestRunner):
1334 A basic test runner implementation which prints results to standard error.
1338 def resultclass(self):
1339 """Class maintaining the results of the tests"""
1340 return VppTestResult
1342 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1343 result_pipe=None, failfast=False, buffer=False,
1344 resultclass=None, print_summary=True, **kwargs):
1345 # ignore stream setting here, use hard-coded stdout to be in sync
1346 # with prints from VppTestCase methods ...
1347 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1348 verbosity, failfast, buffer,
1349 resultclass, **kwargs)
1350 KeepAliveReporter.pipe = keep_alive_pipe
1352 self.orig_stream = self.stream
1353 self.resultclass.test_framework_result_pipe = result_pipe
1355 self.print_summary = print_summary
1357 def _makeResult(self):
1358 return self.resultclass(self.stream,
1363 def run(self, test):
1370 faulthandler.enable() # emit stack trace to stderr if killed by signal
1372 result = super(VppTestRunner, self).run(test)
1373 if not self.print_summary:
1374 self.stream = self.orig_stream
1375 result.stream = self.orig_stream
1379 class Worker(Thread):
1380 def __init__(self, args, logger, env={}):
1381 self.logger = logger
1384 self.env = copy.deepcopy(env)
1385 super(Worker, self).__init__()
1388 executable = self.args[0]
1389 self.logger.debug("Running executable w/args `%s'" % self.args)
1390 env = os.environ.copy()
1391 env.update(self.env)
1392 env["CK_LOG_FILE_NAME"] = "-"
1393 self.process = subprocess.Popen(
1394 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1395 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1396 out, err = self.process.communicate()
1397 self.logger.debug("Finished running `%s'" % executable)
1398 self.logger.info("Return code is `%s'" % self.process.returncode)
1399 self.logger.info(single_line_delim)
1400 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1401 self.logger.info(single_line_delim)
1402 self.logger.info(out)
1403 self.logger.info(single_line_delim)
1404 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1405 self.logger.info(single_line_delim)
1406 self.logger.info(err)
1407 self.logger.info(single_line_delim)
1408 self.result = self.process.returncode
1410 if __name__ == '__main__':