3 from __future__ import print_function
16 from collections import deque
17 from threading import Thread, Event
18 from inspect import getdoc, isclass
19 from traceback import format_exception
20 from logging import FileHandler, DEBUG, Formatter
23 from scapy.packet import Raw
24 from hook import StepHook, PollHook, VppDiedError
25 from vpp_pg_interface import VppPGInterface
26 from vpp_sub_interface import VppSubInterface
27 from vpp_lo_interface import VppLoInterface
28 from vpp_bvi_interface import VppBviInterface
29 from vpp_papi_provider import VppPapiProvider
30 from vpp_papi.vpp_stats import VPPStats
31 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
33 from vpp_object import VppObjectRegistry
34 from util import ppp, is_core_present
35 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
36 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
37 from scapy.layers.inet6 import ICMPv6EchoReply
39 if os.name == 'posix' and sys.version_info[0] < 3:
40 # using subprocess32 is recommended by python official documentation
41 # @ https://docs.python.org/2/library/subprocess.html
42 import subprocess32 as subprocess
46 # Python2/3 compatible
58 debug_framework = False
59 if os.getenv('TEST_DEBUG', "0") == "1":
60 debug_framework = True
64 Test framework module.
66 The module provides a set of tools for constructing and running tests and
67 representing the results.
71 class _PacketInfo(object):
72 """Private class to create packet info object.
74 Help process information about the next packet.
75 Set variables to default values.
77 #: Store the index of the packet.
79 #: Store the index of the source packet generator interface of the packet.
81 #: Store the index of the destination packet generator interface
84 #: Store expected ip version
86 #: Store expected upper protocol
88 #: Store the copy of the former packet.
91 def __eq__(self, other):
92 index = self.index == other.index
93 src = self.src == other.src
94 dst = self.dst == other.dst
95 data = self.data == other.data
96 return index and src and dst and data
99 def pump_output(testclass):
100 """ pump output from vpp stdout/stderr to proper queues """
103 while not testclass.pump_thread_stop_flag.is_set():
104 readable = select.select([testclass.vpp.stdout.fileno(),
105 testclass.vpp.stderr.fileno(),
106 testclass.pump_thread_wakeup_pipe[0]],
108 if testclass.vpp.stdout.fileno() in readable:
109 read = os.read(testclass.vpp.stdout.fileno(), 102400)
111 split = read.splitlines(True)
112 if len(stdout_fragment) > 0:
113 split[0] = "%s%s" % (stdout_fragment, split[0])
114 if len(split) > 0 and split[-1].endswith("\n"):
118 stdout_fragment = split[-1]
119 testclass.vpp_stdout_deque.extend(split[:limit])
120 if not testclass.cache_vpp_output:
121 for line in split[:limit]:
122 testclass.logger.debug(
123 "VPP STDOUT: %s" % line.rstrip("\n"))
124 if testclass.vpp.stderr.fileno() in readable:
125 read = os.read(testclass.vpp.stderr.fileno(), 102400)
127 split = read.splitlines(True)
128 if len(stderr_fragment) > 0:
129 split[0] = "%s%s" % (stderr_fragment, split[0])
130 if len(split) > 0 and split[-1].endswith(b"\n"):
134 stderr_fragment = split[-1]
135 testclass.vpp_stderr_deque.extend(split[:limit])
136 if not testclass.cache_vpp_output:
137 for line in split[:limit]:
138 testclass.logger.debug(
139 "VPP STDERR: %s" % line.rstrip("\n"))
140 # ignoring the dummy pipe here intentionally - the
141 # flag will take care of properly terminating the loop
144 def _is_skip_aarch64_set():
145 return os.getenv('SKIP_AARCH64', 'n').lower() in ('yes', 'y', '1')
147 is_skip_aarch64_set = _is_skip_aarch64_set()
150 def _is_platform_aarch64():
151 return platform.machine() == 'aarch64'
153 is_platform_aarch64 = _is_platform_aarch64()
156 def _running_extended_tests():
157 s = os.getenv("EXTENDED_TESTS", "n")
158 return True if s.lower() in ("y", "yes", "1") else False
160 running_extended_tests = _running_extended_tests()
163 def _running_on_centos():
164 os_id = os.getenv("OS_ID", "")
165 return True if "centos" in os_id.lower() else False
167 running_on_centos = _running_on_centos
170 class KeepAliveReporter(object):
172 Singleton object which reports test start to parent process
177 self.__dict__ = self._shared_state
185 def pipe(self, pipe):
186 if self._pipe is not None:
187 raise Exception("Internal error - pipe should only be set once.")
190 def send_keep_alive(self, test, desc=None):
192 Write current test tmpdir & desc to keep-alive pipe to signal liveness
194 if self.pipe is None:
195 # if not running forked..
199 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
203 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
206 class VppTestCase(unittest.TestCase):
207 """This subclass is a base class for VPP test cases that are implemented as
208 classes. It provides methods to create and run test case.
211 extra_vpp_punt_config = []
212 extra_vpp_plugin_config = []
215 def packet_infos(self):
216 """List of packet infos"""
217 return self._packet_infos
220 def get_packet_count_for_if_idx(cls, dst_if_index):
221 """Get the number of packet info for specified destination if index"""
222 if dst_if_index in cls._packet_count_for_dst_if_idx:
223 return cls._packet_count_for_dst_if_idx[dst_if_index]
229 """Return the instance of this testcase"""
230 return cls.test_instance
233 def set_debug_flags(cls, d):
234 cls.debug_core = False
235 cls.debug_gdb = False
236 cls.debug_gdbserver = False
241 cls.debug_core = True
244 elif dl == "gdbserver":
245 cls.debug_gdbserver = True
247 raise Exception("Unrecognized DEBUG option: '%s'" % d)
250 def get_least_used_cpu():
251 cpu_usage_list = [set(range(psutil.cpu_count()))]
252 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
253 if 'vpp_main' == p.info['name']]
254 for vpp_process in vpp_processes:
255 for cpu_usage_set in cpu_usage_list:
257 cpu_num = vpp_process.cpu_num()
258 if cpu_num in cpu_usage_set:
259 cpu_usage_set_index = cpu_usage_list.index(
261 if cpu_usage_set_index == len(cpu_usage_list) - 1:
262 cpu_usage_list.append({cpu_num})
264 cpu_usage_list[cpu_usage_set_index + 1].add(
266 cpu_usage_set.remove(cpu_num)
268 except psutil.NoSuchProcess:
271 for cpu_usage_set in cpu_usage_list:
272 if len(cpu_usage_set) > 0:
273 min_usage_set = cpu_usage_set
276 return random.choice(tuple(min_usage_set))
279 def setUpConstants(cls):
280 """ Set-up the test case class based on environment variables """
281 s = os.getenv("STEP", "n")
282 cls.step = True if s.lower() in ("y", "yes", "1") else False
283 d = os.getenv("DEBUG", None)
284 c = os.getenv("CACHE_OUTPUT", "1")
285 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
286 cls.set_debug_flags(d)
287 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
288 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
289 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
291 if cls.plugin_path is not None:
292 if cls.extern_plugin_path is not None:
293 plugin_path = "%s:%s" % (
294 cls.plugin_path, cls.extern_plugin_path)
296 plugin_path = cls.plugin_path
297 elif cls.extern_plugin_path is not None:
298 plugin_path = cls.extern_plugin_path
300 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
301 debug_cli = "cli-listen localhost:5002"
303 size = os.getenv("COREDUMP_SIZE")
305 coredump_size = "coredump-size %s" % size
306 if coredump_size is None:
307 coredump_size = "coredump-size unlimited"
309 cpu_core_number = cls.get_least_used_cpu()
311 cls.vpp_cmdline = [cls.vpp_bin, "unix",
312 "{", "nodaemon", debug_cli, "full-coredump",
313 coredump_size, "runtime-dir", cls.tempdir, "}",
314 "api-trace", "{", "on", "}", "api-segment", "{",
315 "prefix", cls.shm_prefix, "}", "cpu", "{",
316 "main-core", str(cpu_core_number), "}", "statseg",
317 "{", "socket-name", cls.stats_sock, "}", "plugins",
318 "{", "plugin", "dpdk_plugin.so", "{", "disable",
319 "}", "plugin", "unittest_plugin.so", "{", "enable",
320 "}"] + cls.extra_vpp_plugin_config + ["}", ]
321 if cls.extra_vpp_punt_config is not None:
322 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
323 if plugin_path is not None:
324 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
325 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
326 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
329 def wait_for_enter(cls):
330 if cls.debug_gdbserver:
331 print(double_line_delim)
332 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
334 print(double_line_delim)
335 print("Spawned VPP with PID: %d" % cls.vpp.pid)
337 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
339 print(single_line_delim)
340 print("You can debug the VPP using e.g.:")
341 if cls.debug_gdbserver:
342 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
343 print("Now is the time to attach a gdb by running the above "
344 "command, set up breakpoints etc. and then resume VPP from "
345 "within gdb by issuing the 'continue' command")
347 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
348 print("Now is the time to attach a gdb by running the above "
349 "command and set up breakpoints etc.")
350 print(single_line_delim)
351 input("Press ENTER to continue running the testcase...")
355 cmdline = cls.vpp_cmdline
357 if cls.debug_gdbserver:
358 gdbserver = '/usr/bin/gdbserver'
359 if not os.path.isfile(gdbserver) or \
360 not os.access(gdbserver, os.X_OK):
361 raise Exception("gdbserver binary '%s' does not exist or is "
362 "not executable" % gdbserver)
364 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
365 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
368 cls.vpp = subprocess.Popen(cmdline,
369 stdout=subprocess.PIPE,
370 stderr=subprocess.PIPE,
372 except subprocess.CalledProcessError as e:
373 cls.logger.critical("Subprocess returned with non-0 return code: ("
377 cls.logger.critical("Subprocess returned with OS error: "
378 "(%s) %s", e.errno, e.strerror)
380 except Exception as e:
381 cls.logger.exception("Subprocess returned unexpected from "
388 def wait_for_stats_socket(cls):
389 deadline = time.time() + 3
391 while time.time() < deadline or \
392 cls.debug_gdb or cls.debug_gdbserver:
393 if os.path.exists(cls.stats_sock):
398 cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
403 Perform class setup before running the testcase
404 Remove shared memory files, start vpp and connect the vpp-api
406 super(VppTestCase, cls).setUpClass()
407 gc.collect() # run garbage collection first
409 cls.logger = get_logger(cls.__name__)
410 if hasattr(cls, 'parallel_handler'):
411 cls.logger.addHandler(cls.parallel_handler)
412 cls.logger.propagate = False
413 cls.tempdir = tempfile.mkdtemp(
414 prefix='vpp-unittest-%s-' % cls.__name__)
415 cls.stats_sock = "%s/stats.sock" % cls.tempdir
416 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
417 cls.file_handler.setFormatter(
418 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
420 cls.file_handler.setLevel(DEBUG)
421 cls.logger.addHandler(cls.file_handler)
422 cls.shm_prefix = os.path.basename(cls.tempdir)
423 os.chdir(cls.tempdir)
424 cls.logger.info("Temporary dir is %s, shm prefix is %s",
425 cls.tempdir, cls.shm_prefix)
427 cls.reset_packet_infos()
429 cls._zombie_captures = []
432 cls.registry = VppObjectRegistry()
433 cls.vpp_startup_failed = False
434 cls.reporter = KeepAliveReporter()
435 # need to catch exceptions here because if we raise, then the cleanup
436 # doesn't get called and we might end with a zombie vpp
439 cls.reporter.send_keep_alive(cls, 'setUpClass')
440 VppTestResult.current_test_case_info = TestCaseInfo(
441 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
442 cls.vpp_stdout_deque = deque()
443 cls.vpp_stderr_deque = deque()
444 cls.pump_thread_stop_flag = Event()
445 cls.pump_thread_wakeup_pipe = os.pipe()
446 cls.pump_thread = Thread(target=pump_output, args=(cls,))
447 cls.pump_thread.daemon = True
448 cls.pump_thread.start()
449 if cls.debug_gdb or cls.debug_gdbserver:
453 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
459 cls.vapi.register_hook(hook)
460 cls.wait_for_stats_socket()
461 cls.statistics = VPPStats(socketname=cls.stats_sock)
465 cls.vpp_startup_failed = True
467 "VPP died shortly after startup, check the"
468 " output to standard error for possible cause")
474 cls.vapi.disconnect()
477 if cls.debug_gdbserver:
478 print(colorize("You're running VPP inside gdbserver but "
479 "VPP-API connection failed, did you forget "
480 "to 'continue' VPP from within gdb?", RED))
492 Disconnect vpp-api, kill vpp and cleanup shared memory files
494 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
496 if cls.vpp.returncode is None:
497 print(double_line_delim)
498 print("VPP or GDB server is still running")
499 print(single_line_delim)
500 input("When done debugging, press ENTER to kill the "
501 "process and finish running the testcase...")
503 # first signal that we want to stop the pump thread, then wake it up
504 if hasattr(cls, 'pump_thread_stop_flag'):
505 cls.pump_thread_stop_flag.set()
506 if hasattr(cls, 'pump_thread_wakeup_pipe'):
507 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
508 if hasattr(cls, 'pump_thread'):
509 cls.logger.debug("Waiting for pump thread to stop")
510 cls.pump_thread.join()
511 if hasattr(cls, 'vpp_stderr_reader_thread'):
512 cls.logger.debug("Waiting for stdderr pump to stop")
513 cls.vpp_stderr_reader_thread.join()
515 if hasattr(cls, 'vpp'):
516 if hasattr(cls, 'vapi'):
517 cls.logger.debug("Disconnecting class vapi client on %s",
519 cls.vapi.disconnect()
520 cls.logger.debug("Deleting class vapi attribute on %s",
524 if cls.vpp.returncode is None:
525 cls.logger.debug("Sending TERM to vpp")
527 cls.logger.debug("Waiting for vpp to die")
528 cls.vpp.communicate()
529 cls.logger.debug("Deleting class vpp attribute on %s",
533 if cls.vpp_startup_failed:
534 stdout_log = cls.logger.info
535 stderr_log = cls.logger.critical
537 stdout_log = cls.logger.info
538 stderr_log = cls.logger.info
540 if hasattr(cls, 'vpp_stdout_deque'):
541 stdout_log(single_line_delim)
542 stdout_log('VPP output to stdout while running %s:', cls.__name__)
543 stdout_log(single_line_delim)
544 vpp_output = "".join(cls.vpp_stdout_deque)
545 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
547 stdout_log('\n%s', vpp_output)
548 stdout_log(single_line_delim)
550 if hasattr(cls, 'vpp_stderr_deque'):
551 stderr_log(single_line_delim)
552 stderr_log('VPP output to stderr while running %s:', cls.__name__)
553 stderr_log(single_line_delim)
554 vpp_output = "".join(cls.vpp_stderr_deque)
555 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
557 stderr_log('\n%s', vpp_output)
558 stderr_log(single_line_delim)
561 def tearDownClass(cls):
562 """ Perform final cleanup after running all tests in this test-case """
563 cls.reporter.send_keep_alive(cls, 'tearDownClass')
565 cls.file_handler.close()
566 cls.reset_packet_infos()
568 debug_internal.on_tear_down_class(cls)
571 """ Show various debug prints after each test """
572 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
573 (self.__class__.__name__, self._testMethodName,
574 self._testMethodDoc))
575 if not self.vpp_dead:
576 self.logger.debug(self.vapi.cli("show trace max 1000"))
577 self.logger.info(self.vapi.ppcli("show interface"))
578 self.logger.info(self.vapi.ppcli("show hardware"))
579 self.logger.info(self.statistics.set_errors_str())
580 self.logger.info(self.vapi.ppcli("show run"))
581 self.logger.info(self.vapi.ppcli("show log"))
582 self.registry.remove_vpp_config(self.logger)
583 # Save/Dump VPP api trace log
584 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
585 tmp_api_trace = "/tmp/%s" % api_trace
586 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
587 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
588 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
590 os.rename(tmp_api_trace, vpp_api_trace_log)
591 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
594 self.registry.unregister_all(self.logger)
597 """ Clear trace before running each test"""
598 super(VppTestCase, self).setUp()
599 self.reporter.send_keep_alive(self)
600 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
601 (self.__class__.__name__, self._testMethodName,
602 self._testMethodDoc))
604 raise Exception("VPP is dead when setting up the test")
605 self.sleep(.1, "during setUp")
606 self.vpp_stdout_deque.append(
607 "--- test setUp() for %s.%s(%s) starts here ---\n" %
608 (self.__class__.__name__, self._testMethodName,
609 self._testMethodDoc))
610 self.vpp_stderr_deque.append(
611 "--- test setUp() for %s.%s(%s) starts here ---\n" %
612 (self.__class__.__name__, self._testMethodName,
613 self._testMethodDoc))
614 self.vapi.cli("clear trace")
615 # store the test instance inside the test class - so that objects
616 # holding the class can access instance methods (like assertEqual)
617 type(self).test_instance = self
620 def pg_enable_capture(cls, interfaces=None):
622 Enable capture on packet-generator interfaces
624 :param interfaces: iterable interface indexes (if None,
625 use self.pg_interfaces)
628 if interfaces is None:
629 interfaces = cls.pg_interfaces
634 def register_capture(cls, cap_name):
635 """ Register a capture in the testclass """
636 # add to the list of captures with current timestamp
637 cls._captures.append((time.time(), cap_name))
638 # filter out from zombies
639 cls._zombie_captures = [(stamp, name)
640 for (stamp, name) in cls._zombie_captures
645 """ Remove any zombie captures and enable the packet generator """
646 # how long before capture is allowed to be deleted - otherwise vpp
647 # crashes - 100ms seems enough (this shouldn't be needed at all)
650 for stamp, cap_name in cls._zombie_captures:
651 wait = stamp + capture_ttl - now
653 cls.sleep(wait, "before deleting capture %s" % cap_name)
655 cls.logger.debug("Removing zombie capture %s" % cap_name)
656 cls.vapi.cli('packet-generator delete %s' % cap_name)
658 cls.vapi.cli("trace add pg-input 1000")
659 cls.vapi.cli('packet-generator enable')
660 cls._zombie_captures = cls._captures
664 def create_pg_interfaces(cls, interfaces):
666 Create packet-generator interfaces.
668 :param interfaces: iterable indexes of the interfaces.
669 :returns: List of created interfaces.
674 intf = VppPGInterface(cls, i)
675 setattr(cls, intf.name, intf)
677 cls.pg_interfaces = result
681 def create_loopback_interfaces(cls, count):
683 Create loopback interfaces.
685 :param count: number of interfaces created.
686 :returns: List of created interfaces.
688 result = [VppLoInterface(cls) for i in range(count)]
690 setattr(cls, intf.name, intf)
691 cls.lo_interfaces = result
695 def create_bvi_interfaces(cls, count):
697 Create BVI interfaces.
699 :param count: number of interfaces created.
700 :returns: List of created interfaces.
702 result = [VppBviInterface(cls) for i in range(count)]
704 setattr(cls, intf.name, intf)
705 cls.bvi_interfaces = result
709 def extend_packet(packet, size, padding=' '):
711 Extend packet to given size by padding with spaces or custom padding
712 NOTE: Currently works only when Raw layer is present.
714 :param packet: packet
715 :param size: target size
716 :param padding: padding used to extend the payload
719 packet_len = len(packet) + 4
720 extend = size - packet_len
722 num = (extend / len(padding)) + 1
723 packet[Raw].load += (padding * num)[:extend]
726 def reset_packet_infos(cls):
727 """ Reset the list of packet info objects and packet counts to zero """
728 cls._packet_infos = {}
729 cls._packet_count_for_dst_if_idx = {}
732 def create_packet_info(cls, src_if, dst_if):
734 Create packet info object containing the source and destination indexes
735 and add it to the testcase's packet info list
737 :param VppInterface src_if: source interface
738 :param VppInterface dst_if: destination interface
740 :returns: _PacketInfo object
744 info.index = len(cls._packet_infos)
745 info.src = src_if.sw_if_index
746 info.dst = dst_if.sw_if_index
747 if isinstance(dst_if, VppSubInterface):
748 dst_idx = dst_if.parent.sw_if_index
750 dst_idx = dst_if.sw_if_index
751 if dst_idx in cls._packet_count_for_dst_if_idx:
752 cls._packet_count_for_dst_if_idx[dst_idx] += 1
754 cls._packet_count_for_dst_if_idx[dst_idx] = 1
755 cls._packet_infos[info.index] = info
759 def info_to_payload(info):
761 Convert _PacketInfo object to packet payload
763 :param info: _PacketInfo object
765 :returns: string containing serialized data from packet info
767 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
771 def payload_to_info(payload, payload_field='load'):
773 Convert packet payload to _PacketInfo object
775 :param payload: packet payload
776 :type payload: <class 'scapy.packet.Raw'>
777 :param payload_field: packet fieldname of payload "load" for
778 <class 'scapy.packet.Raw'>
779 :type payload_field: str
780 :returns: _PacketInfo object containing de-serialized data from payload
783 numbers = getattr(payload, payload_field).split()
785 info.index = int(numbers[0])
786 info.src = int(numbers[1])
787 info.dst = int(numbers[2])
788 info.ip = int(numbers[3])
789 info.proto = int(numbers[4])
792 def get_next_packet_info(self, info):
794 Iterate over the packet info list stored in the testcase
795 Start iteration with first element if info is None
796 Continue based on index in info if info is specified
798 :param info: info or None
799 :returns: next info in list or None if no more infos
804 next_index = info.index + 1
805 if next_index == len(self._packet_infos):
808 return self._packet_infos[next_index]
810 def get_next_packet_info_for_interface(self, src_index, info):
812 Search the packet info list for the next packet info with same source
815 :param src_index: source interface index to search for
816 :param info: packet info - where to start the search
817 :returns: packet info or None
821 info = self.get_next_packet_info(info)
824 if info.src == src_index:
827 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
829 Search the packet info list for the next packet info with same source
830 and destination interface indexes
832 :param src_index: source interface index to search for
833 :param dst_index: destination interface index to search for
834 :param info: packet info - where to start the search
835 :returns: packet info or None
839 info = self.get_next_packet_info_for_interface(src_index, info)
842 if info.dst == dst_index:
845 def assert_equal(self, real_value, expected_value, name_or_class=None):
846 if name_or_class is None:
847 self.assertEqual(real_value, expected_value)
850 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
851 msg = msg % (getdoc(name_or_class).strip(),
852 real_value, str(name_or_class(real_value)),
853 expected_value, str(name_or_class(expected_value)))
855 msg = "Invalid %s: %s does not match expected value %s" % (
856 name_or_class, real_value, expected_value)
858 self.assertEqual(real_value, expected_value, msg)
860 def assert_in_range(self,
868 msg = "Invalid %s: %s out of range <%s,%s>" % (
869 name, real_value, expected_min, expected_max)
870 self.assertTrue(expected_min <= real_value <= expected_max, msg)
872 def assert_packet_checksums_valid(self, packet,
873 ignore_zero_udp_checksums=True):
874 received = packet.__class__(scapy.compat.raw(packet))
876 ppp("Verifying packet checksums for packet:", received))
877 udp_layers = ['UDP', 'UDPerror']
878 checksum_fields = ['cksum', 'chksum']
881 temp = received.__class__(scapy.compat.raw(received))
883 layer = temp.getlayer(counter)
885 for cf in checksum_fields:
886 if hasattr(layer, cf):
887 if ignore_zero_udp_checksums and \
888 0 == getattr(layer, cf) and \
889 layer.name in udp_layers:
892 checksums.append((counter, cf))
895 counter = counter + 1
896 if 0 == len(checksums):
898 temp = temp.__class__(scapy.compat.raw(temp))
899 for layer, cf in checksums:
900 calc_sum = getattr(temp[layer], cf)
902 getattr(received[layer], cf), calc_sum,
903 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
905 "Checksum field `%s` on `%s` layer has correct value `%s`" %
906 (cf, temp[layer].name, calc_sum))
908 def assert_checksum_valid(self, received_packet, layer,
910 ignore_zero_checksum=False):
911 """ Check checksum of received packet on given layer """
912 received_packet_checksum = getattr(received_packet[layer], field_name)
913 if ignore_zero_checksum and 0 == received_packet_checksum:
915 recalculated = received_packet.__class__(
916 scapy.compat.raw(received_packet))
917 delattr(recalculated[layer], field_name)
918 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
919 self.assert_equal(received_packet_checksum,
920 getattr(recalculated[layer], field_name),
921 "packet checksum on layer: %s" % layer)
923 def assert_ip_checksum_valid(self, received_packet,
924 ignore_zero_checksum=False):
925 self.assert_checksum_valid(received_packet, 'IP',
926 ignore_zero_checksum=ignore_zero_checksum)
928 def assert_tcp_checksum_valid(self, received_packet,
929 ignore_zero_checksum=False):
930 self.assert_checksum_valid(received_packet, 'TCP',
931 ignore_zero_checksum=ignore_zero_checksum)
933 def assert_udp_checksum_valid(self, received_packet,
934 ignore_zero_checksum=True):
935 self.assert_checksum_valid(received_packet, 'UDP',
936 ignore_zero_checksum=ignore_zero_checksum)
938 def assert_embedded_icmp_checksum_valid(self, received_packet):
939 if received_packet.haslayer(IPerror):
940 self.assert_checksum_valid(received_packet, 'IPerror')
941 if received_packet.haslayer(TCPerror):
942 self.assert_checksum_valid(received_packet, 'TCPerror')
943 if received_packet.haslayer(UDPerror):
944 self.assert_checksum_valid(received_packet, 'UDPerror',
945 ignore_zero_checksum=True)
946 if received_packet.haslayer(ICMPerror):
947 self.assert_checksum_valid(received_packet, 'ICMPerror')
949 def assert_icmp_checksum_valid(self, received_packet):
950 self.assert_checksum_valid(received_packet, 'ICMP')
951 self.assert_embedded_icmp_checksum_valid(received_packet)
953 def assert_icmpv6_checksum_valid(self, pkt):
954 if pkt.haslayer(ICMPv6DestUnreach):
955 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
956 self.assert_embedded_icmp_checksum_valid(pkt)
957 if pkt.haslayer(ICMPv6EchoRequest):
958 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
959 if pkt.haslayer(ICMPv6EchoReply):
960 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
962 def assert_packet_counter_equal(self, counter, expected_value):
963 if counter.startswith("/"):
964 counter_value = self.statistics.get_counter(counter)
965 self.assert_equal(counter_value, expected_value,
966 "packet counter `%s'" % counter)
968 counters = self.vapi.cli("sh errors").split('\n')
970 for i in range(1, len(counters) - 1):
971 results = counters[i].split()
972 if results[1] == counter:
973 counter_value = int(results[0])
977 def sleep(cls, timeout, remark=None):
979 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
980 # * by Guido, only the main thread can be interrupted.
982 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
985 if hasattr(os, 'sched_yield'):
991 if hasattr(cls, 'logger'):
992 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
996 if hasattr(cls, 'logger') and after - before > 2 * timeout:
997 cls.logger.error("unexpected self.sleep() result - "
998 "slept for %es instead of ~%es!",
999 after - before, timeout)
1000 if hasattr(cls, 'logger'):
1002 "Finished sleep (%s) - slept %es (wanted %es)",
1003 remark, after - before, timeout)
1005 def pg_send(self, intf, pkts):
1006 self.vapi.cli("clear trace")
1007 intf.add_stream(pkts)
1008 self.pg_enable_capture(self.pg_interfaces)
1011 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
1012 self.pg_send(intf, pkts)
1015 for i in self.pg_interfaces:
1016 i.get_capture(0, timeout=timeout)
1017 i.assert_nothing_captured(remark=remark)
1020 def send_and_expect(self, intf, pkts, output, n_rx=None):
1023 self.pg_send(intf, pkts)
1024 rx = output.get_capture(n_rx)
1027 def send_and_expect_only(self, intf, pkts, output, timeout=None):
1028 self.pg_send(intf, pkts)
1029 rx = output.get_capture(len(pkts))
1033 for i in self.pg_interfaces:
1034 if i not in outputs:
1035 i.get_capture(0, timeout=timeout)
1036 i.assert_nothing_captured()
1042 """ unittest calls runTest when TestCase is instantiated without a
1043 test case. Use case: Writing unittests against VppTestCase"""
1047 def get_testcase_doc_name(test):
1048 return getdoc(test.__class__).splitlines()[0]
1051 def get_test_description(descriptions, test):
1052 short_description = test.shortDescription()
1053 if descriptions and short_description:
1054 return short_description
1059 class TestCaseInfo(object):
1060 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1061 self.logger = logger
1062 self.tempdir = tempdir
1063 self.vpp_pid = vpp_pid
1064 self.vpp_bin_path = vpp_bin_path
1065 self.core_crash_test = None
1068 class VppTestResult(unittest.TestResult):
1070 @property result_string
1071 String variable to store the test case result string.
1073 List variable containing 2-tuples of TestCase instances and strings
1074 holding formatted tracebacks. Each tuple represents a test which
1075 raised an unexpected exception.
1077 List variable containing 2-tuples of TestCase instances and strings
1078 holding formatted tracebacks. Each tuple represents a test where
1079 a failure was explicitly signalled using the TestCase.assert*()
1083 failed_test_cases_info = set()
1084 core_crash_test_cases_info = set()
1085 current_test_case_info = None
1087 def __init__(self, stream=None, descriptions=None, verbosity=None,
1090 :param stream File descriptor to store where to report test results.
1091 Set to the standard error stream by default.
1092 :param descriptions Boolean variable to store information if to use
1093 test case descriptions.
1094 :param verbosity Integer variable to store required verbosity level.
1096 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1097 self.stream = stream
1098 self.descriptions = descriptions
1099 self.verbosity = verbosity
1100 self.result_string = None
1101 self.runner = runner
1103 def addSuccess(self, test):
1105 Record a test succeeded result
1110 if self.current_test_case_info:
1111 self.current_test_case_info.logger.debug(
1112 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1113 test._testMethodName,
1114 test._testMethodDoc))
1115 unittest.TestResult.addSuccess(self, test)
1116 self.result_string = colorize("OK", GREEN)
1118 self.send_result_through_pipe(test, PASS)
1120 def addSkip(self, test, reason):
1122 Record a test skipped.
1128 if self.current_test_case_info:
1129 self.current_test_case_info.logger.debug(
1130 "--- addSkip() %s.%s(%s) called, reason is %s" %
1131 (test.__class__.__name__, test._testMethodName,
1132 test._testMethodDoc, reason))
1133 unittest.TestResult.addSkip(self, test, reason)
1134 self.result_string = colorize("SKIP", YELLOW)
1136 self.send_result_through_pipe(test, SKIP)
1138 def symlink_failed(self):
1139 if self.current_test_case_info:
1141 failed_dir = os.getenv('FAILED_DIR')
1142 link_path = os.path.join(
1145 os.path.basename(self.current_test_case_info.tempdir))
1146 if self.current_test_case_info.logger:
1147 self.current_test_case_info.logger.debug(
1148 "creating a link to the failed test")
1149 self.current_test_case_info.logger.debug(
1150 "os.symlink(%s, %s)" %
1151 (self.current_test_case_info.tempdir, link_path))
1152 if os.path.exists(link_path):
1153 if self.current_test_case_info.logger:
1154 self.current_test_case_info.logger.debug(
1155 'symlink already exists')
1157 os.symlink(self.current_test_case_info.tempdir, link_path)
1159 except Exception as e:
1160 if self.current_test_case_info.logger:
1161 self.current_test_case_info.logger.error(e)
1163 def send_result_through_pipe(self, test, result):
1164 if hasattr(self, 'test_framework_result_pipe'):
1165 pipe = self.test_framework_result_pipe
1167 pipe.send((test.id(), result))
1169 def log_error(self, test, err, fn_name):
1170 if self.current_test_case_info:
1171 if isinstance(test, unittest.suite._ErrorHolder):
1172 test_name = test.description
1174 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1175 test._testMethodName,
1176 test._testMethodDoc)
1177 self.current_test_case_info.logger.debug(
1178 "--- %s() %s called, err is %s" %
1179 (fn_name, test_name, err))
1180 self.current_test_case_info.logger.debug(
1181 "formatted exception is:\n%s" %
1182 "".join(format_exception(*err)))
1184 def add_error(self, test, err, unittest_fn, error_type):
1185 if error_type == FAIL:
1186 self.log_error(test, err, 'addFailure')
1187 error_type_str = colorize("FAIL", RED)
1188 elif error_type == ERROR:
1189 self.log_error(test, err, 'addError')
1190 error_type_str = colorize("ERROR", RED)
1192 raise Exception('Error type %s cannot be used to record an '
1193 'error or a failure' % error_type)
1195 unittest_fn(self, test, err)
1196 if self.current_test_case_info:
1197 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1199 self.current_test_case_info.tempdir)
1200 self.symlink_failed()
1201 self.failed_test_cases_info.add(self.current_test_case_info)
1202 if is_core_present(self.current_test_case_info.tempdir):
1203 if not self.current_test_case_info.core_crash_test:
1204 if isinstance(test, unittest.suite._ErrorHolder):
1205 test_name = str(test)
1207 test_name = "'{!s}' ({!s})".format(
1208 get_testcase_doc_name(test), test.id())
1209 self.current_test_case_info.core_crash_test = test_name
1210 self.core_crash_test_cases_info.add(
1211 self.current_test_case_info)
1213 self.result_string = '%s [no temp dir]' % error_type_str
1215 self.send_result_through_pipe(test, error_type)
1217 def addFailure(self, test, err):
1219 Record a test failed result
1222 :param err: error message
1225 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1227 def addError(self, test, err):
1229 Record a test error result
1232 :param err: error message
1235 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1237 def getDescription(self, test):
1239 Get test description
1242 :returns: test description
1245 return get_test_description(self.descriptions, test)
1247 def startTest(self, test):
1255 def print_header(test):
1256 if not hasattr(test.__class__, '_header_printed'):
1257 print(double_line_delim)
1258 print(colorize(getdoc(test).splitlines()[0], GREEN))
1259 print(double_line_delim)
1260 test.__class__._header_printed = True
1264 unittest.TestResult.startTest(self, test)
1265 if self.verbosity > 0:
1266 self.stream.writeln(
1267 "Starting " + self.getDescription(test) + " ...")
1268 self.stream.writeln(single_line_delim)
1270 def stopTest(self, test):
1272 Called when the given test has been run
1277 unittest.TestResult.stopTest(self, test)
1278 if self.verbosity > 0:
1279 self.stream.writeln(single_line_delim)
1280 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1281 self.result_string))
1282 self.stream.writeln(single_line_delim)
1284 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1285 self.result_string))
1287 self.send_result_through_pipe(test, TEST_RUN)
1289 def printErrors(self):
1291 Print errors from running the test case
1293 if len(self.errors) > 0 or len(self.failures) > 0:
1294 self.stream.writeln()
1295 self.printErrorList('ERROR', self.errors)
1296 self.printErrorList('FAIL', self.failures)
1298 # ^^ that is the last output from unittest before summary
1299 if not self.runner.print_summary:
1300 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1301 self.stream = devnull
1302 self.runner.stream = devnull
1304 def printErrorList(self, flavour, errors):
1306 Print error list to the output stream together with error type
1307 and test case description.
1309 :param flavour: error type
1310 :param errors: iterable errors
1313 for test, err in errors:
1314 self.stream.writeln(double_line_delim)
1315 self.stream.writeln("%s: %s" %
1316 (flavour, self.getDescription(test)))
1317 self.stream.writeln(single_line_delim)
1318 self.stream.writeln("%s" % err)
1321 class VppTestRunner(unittest.TextTestRunner):
1323 A basic test runner implementation which prints results to standard error.
1327 def resultclass(self):
1328 """Class maintaining the results of the tests"""
1329 return VppTestResult
1331 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1332 result_pipe=None, failfast=False, buffer=False,
1333 resultclass=None, print_summary=True, **kwargs):
1334 # ignore stream setting here, use hard-coded stdout to be in sync
1335 # with prints from VppTestCase methods ...
1336 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1337 verbosity, failfast, buffer,
1338 resultclass, **kwargs)
1339 KeepAliveReporter.pipe = keep_alive_pipe
1341 self.orig_stream = self.stream
1342 self.resultclass.test_framework_result_pipe = result_pipe
1344 self.print_summary = print_summary
1346 def _makeResult(self):
1347 return self.resultclass(self.stream,
1352 def run(self, test):
1359 faulthandler.enable() # emit stack trace to stderr if killed by signal
1361 result = super(VppTestRunner, self).run(test)
1362 if not self.print_summary:
1363 self.stream = self.orig_stream
1364 result.stream = self.orig_stream
1368 class Worker(Thread):
1369 def __init__(self, args, logger, env={}):
1370 self.logger = logger
1373 self.env = copy.deepcopy(env)
1374 super(Worker, self).__init__()
1377 executable = self.args[0]
1378 self.logger.debug("Running executable w/args `%s'" % self.args)
1379 env = os.environ.copy()
1380 env.update(self.env)
1381 env["CK_LOG_FILE_NAME"] = "-"
1382 self.process = subprocess.Popen(
1383 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1384 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1385 out, err = self.process.communicate()
1386 self.logger.debug("Finished running `%s'" % executable)
1387 self.logger.info("Return code is `%s'" % self.process.returncode)
1388 self.logger.info(single_line_delim)
1389 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1390 self.logger.info(single_line_delim)
1391 self.logger.info(out)
1392 self.logger.info(single_line_delim)
1393 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1394 self.logger.info(single_line_delim)
1395 self.logger.info(err)
1396 self.logger.info(single_line_delim)
1397 self.result = self.process.returncode
1399 if __name__ == '__main__':