3 from __future__ import print_function
17 from collections import deque
18 from threading import Thread, Event
19 from inspect import getdoc, isclass
20 from traceback import format_exception
21 from logging import FileHandler, DEBUG, Formatter
24 from scapy.packet import Raw
25 import hook as hookmodule
26 from vpp_pg_interface import VppPGInterface
27 from vpp_sub_interface import VppSubInterface
28 from vpp_lo_interface import VppLoInterface
29 from vpp_bvi_interface import VppBviInterface
30 from vpp_papi_provider import VppPapiProvider
31 from vpp_papi.vpp_stats import VPPStats
32 from vpp_papi.vpp_transport_shmem import VppTransportShmemIOError
33 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
35 from vpp_object import VppObjectRegistry
36 from util import ppp, is_core_present
37 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
38 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
39 from scapy.layers.inet6 import ICMPv6EchoReply
41 if os.name == 'posix' and sys.version_info[0] < 3:
42 # using subprocess32 is recommended by python official documentation
43 # @ https://docs.python.org/2/library/subprocess.html
44 import subprocess32 as subprocess
48 # Python2/3 compatible
61 class BoolEnvironmentVariable(object):
63 def __init__(self, env_var_name, default='n', true_values=None):
64 self.name = env_var_name
65 self.default = default
66 self.true_values = true_values if true_values is not None else \
70 return os.getenv(self.name, self.default).lower() in self.true_values
72 if sys.version_info[0] == 2:
73 __nonzero__ = __bool__
76 return 'BoolEnvironmentVariable(%r, default=%r, true_values=%r)' % \
77 (self.name, self.default, self.true_values)
80 debug_framework = BoolEnvironmentVariable('TEST_DEBUG')
85 Test framework module.
87 The module provides a set of tools for constructing and running tests and
88 representing the results.
92 class VppDiedError(Exception):
93 """ exception for reporting that the subprocess has died."""
95 signals_by_value = {v: k for k, v in signal.__dict__.items() if
96 k.startswith('SIG') and not k.startswith('SIG_')}
98 def __init__(self, rv=None, testcase=None, method_name=None):
100 self.signal_name = None
101 self.testcase = testcase
102 self.method_name = method_name
105 self.signal_name = VppDiedError.signals_by_value[-rv]
106 except (KeyError, TypeError):
109 if testcase is None and method_name is None:
112 in_msg = 'running %s.%s ' % (testcase, method_name)
114 msg = "VPP subprocess died %sunexpectedly with return code: %d%s." % (
117 ' [%s]' % (self.signal_name if
118 self.signal_name is not None else ''))
119 super(VppDiedError, self).__init__(msg)
122 class _PacketInfo(object):
123 """Private class to create packet info object.
125 Help process information about the next packet.
126 Set variables to default values.
128 #: Store the index of the packet.
130 #: Store the index of the source packet generator interface of the packet.
132 #: Store the index of the destination packet generator interface
135 #: Store expected ip version
137 #: Store expected upper protocol
139 #: Store the copy of the former packet.
142 def __eq__(self, other):
143 index = self.index == other.index
144 src = self.src == other.src
145 dst = self.dst == other.dst
146 data = self.data == other.data
147 return index and src and dst and data
150 def pump_output(testclass):
151 """ pump output from vpp stdout/stderr to proper queues """
154 while not testclass.pump_thread_stop_flag.is_set():
155 readable = select.select([testclass.vpp.stdout.fileno(),
156 testclass.vpp.stderr.fileno(),
157 testclass.pump_thread_wakeup_pipe[0]],
159 if testclass.vpp.stdout.fileno() in readable:
160 read = os.read(testclass.vpp.stdout.fileno(), 102400)
162 split = read.splitlines(True)
163 if len(stdout_fragment) > 0:
164 split[0] = "%s%s" % (stdout_fragment, split[0])
165 if len(split) > 0 and split[-1].endswith("\n"):
169 stdout_fragment = split[-1]
170 testclass.vpp_stdout_deque.extend(split[:limit])
171 if not testclass.cache_vpp_output:
172 for line in split[:limit]:
173 testclass.logger.debug(
174 "VPP STDOUT: %s" % line.rstrip("\n"))
175 if testclass.vpp.stderr.fileno() in readable:
176 read = os.read(testclass.vpp.stderr.fileno(), 102400)
178 split = read.decode('ascii',
179 errors='backslashreplace').splitlines(True)
180 if len(stderr_fragment) > 0:
181 split[0] = "%s%s" % (stderr_fragment, split[0])
182 if len(split) > 0 and split[-1].endswith("\n"):
186 stderr_fragment = split[-1]
188 testclass.vpp_stderr_deque.extend(split[:limit])
189 if not testclass.cache_vpp_output:
190 for line in split[:limit]:
191 testclass.logger.debug(
192 "VPP STDERR: %s" % line.rstrip("\n"))
193 # ignoring the dummy pipe here intentionally - the
194 # flag will take care of properly terminating the loop
197 def _is_skip_aarch64_set():
198 return BoolEnvironmentVariable('SKIP_AARCH64')
201 is_skip_aarch64_set = _is_skip_aarch64_set()
204 def _is_platform_aarch64():
205 return platform.machine() == 'aarch64'
208 is_platform_aarch64 = _is_platform_aarch64()
211 def _running_extended_tests():
212 return BoolEnvironmentVariable("EXTENDED_TESTS")
215 running_extended_tests = _running_extended_tests()
218 def _running_on_centos():
219 os_id = os.getenv("OS_ID", "")
220 return True if "centos" in os_id.lower() else False
223 running_on_centos = _running_on_centos()
226 class KeepAliveReporter(object):
228 Singleton object which reports test start to parent process
233 self.__dict__ = self._shared_state
241 def pipe(self, pipe):
242 if self._pipe is not None:
243 raise Exception("Internal error - pipe should only be set once.")
246 def send_keep_alive(self, test, desc=None):
248 Write current test tmpdir & desc to keep-alive pipe to signal liveness
250 if self.pipe is None:
251 # if not running forked..
255 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
259 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
262 class VppTestCase(unittest.TestCase):
263 """This subclass is a base class for VPP test cases that are implemented as
264 classes. It provides methods to create and run test case.
267 extra_vpp_punt_config = []
268 extra_vpp_plugin_config = []
271 def packet_infos(self):
272 """List of packet infos"""
273 return self._packet_infos
276 def get_packet_count_for_if_idx(cls, dst_if_index):
277 """Get the number of packet info for specified destination if index"""
278 if dst_if_index in cls._packet_count_for_dst_if_idx:
279 return cls._packet_count_for_dst_if_idx[dst_if_index]
285 """Return the instance of this testcase"""
286 return cls.test_instance
289 def set_debug_flags(cls, d):
290 cls.debug_core = False
291 cls.debug_gdb = False
292 cls.debug_gdbserver = False
297 cls.debug_core = True
300 elif dl == "gdbserver":
301 cls.debug_gdbserver = True
303 raise Exception("Unrecognized DEBUG option: '%s'" % d)
306 def get_least_used_cpu():
307 cpu_usage_list = [set(range(psutil.cpu_count()))]
308 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
309 if 'vpp_main' == p.info['name']]
310 for vpp_process in vpp_processes:
311 for cpu_usage_set in cpu_usage_list:
313 cpu_num = vpp_process.cpu_num()
314 if cpu_num in cpu_usage_set:
315 cpu_usage_set_index = cpu_usage_list.index(
317 if cpu_usage_set_index == len(cpu_usage_list) - 1:
318 cpu_usage_list.append({cpu_num})
320 cpu_usage_list[cpu_usage_set_index + 1].add(
322 cpu_usage_set.remove(cpu_num)
324 except psutil.NoSuchProcess:
327 for cpu_usage_set in cpu_usage_list:
328 if len(cpu_usage_set) > 0:
329 min_usage_set = cpu_usage_set
332 return random.choice(tuple(min_usage_set))
335 def setUpConstants(cls):
336 """ Set-up the test case class based on environment variables """
337 cls.step = BoolEnvironmentVariable('STEP')
338 d = os.getenv("DEBUG", None)
339 # inverted case to handle '' == True
340 c = os.getenv("CACHE_OUTPUT", "1")
341 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
342 cls.set_debug_flags(d)
343 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
344 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
345 cls.test_plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
346 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
348 if cls.plugin_path is not None:
349 if cls.extern_plugin_path is not None:
350 plugin_path = "%s:%s" % (
351 cls.plugin_path, cls.extern_plugin_path)
353 plugin_path = cls.plugin_path
354 elif cls.extern_plugin_path is not None:
355 plugin_path = cls.extern_plugin_path
357 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
358 debug_cli = "cli-listen localhost:5002"
360 size = os.getenv("COREDUMP_SIZE")
362 coredump_size = "coredump-size %s" % size
363 if coredump_size is None:
364 coredump_size = "coredump-size unlimited"
366 cpu_core_number = cls.get_least_used_cpu()
367 if not hasattr(cls, "worker_config"):
368 cls.worker_config = ""
370 cls.vpp_cmdline = [cls.vpp_bin, "unix",
371 "{", "nodaemon", debug_cli, "full-coredump",
372 coredump_size, "runtime-dir", cls.tempdir, "}",
373 "api-trace", "{", "on", "}", "api-segment", "{",
374 "prefix", cls.shm_prefix, "}", "cpu", "{",
375 "main-core", str(cpu_core_number),
376 cls.worker_config, "}",
377 "statseg", "{", "socket-name", cls.stats_sock, "}",
378 "socksvr", "{", "socket-name", cls.api_sock, "}",
380 "{", "plugin", "dpdk_plugin.so", "{", "disable",
381 "}", "plugin", "rdma_plugin.so", "{", "disable",
382 "}", "plugin", "unittest_plugin.so", "{", "enable",
383 "}"] + cls.extra_vpp_plugin_config + ["}", ]
384 if cls.extra_vpp_punt_config is not None:
385 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
386 if plugin_path is not None:
387 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
388 if cls.test_plugin_path is not None:
389 cls.vpp_cmdline.extend(["test_plugin_path", cls.test_plugin_path])
391 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
392 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
395 def wait_for_enter(cls):
396 if cls.debug_gdbserver:
397 print(double_line_delim)
398 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
400 print(double_line_delim)
401 print("Spawned VPP with PID: %d" % cls.vpp.pid)
403 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
405 print(single_line_delim)
406 print("You can debug the VPP using e.g.:")
407 if cls.debug_gdbserver:
408 print("sudo gdb " + cls.vpp_bin +
409 " -ex 'target remote localhost:7777'")
410 print("Now is the time to attach a gdb by running the above "
411 "command, set up breakpoints etc. and then resume VPP from "
412 "within gdb by issuing the 'continue' command")
414 print("sudo gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
415 print("Now is the time to attach a gdb by running the above "
416 "command and set up breakpoints etc.")
417 print(single_line_delim)
418 input("Press ENTER to continue running the testcase...")
422 cmdline = cls.vpp_cmdline
424 if cls.debug_gdbserver:
425 gdbserver = '/usr/bin/gdbserver'
426 if not os.path.isfile(gdbserver) or \
427 not os.access(gdbserver, os.X_OK):
428 raise Exception("gdbserver binary '%s' does not exist or is "
429 "not executable" % gdbserver)
431 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
432 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
435 cls.vpp = subprocess.Popen(cmdline,
436 stdout=subprocess.PIPE,
437 stderr=subprocess.PIPE,
439 except subprocess.CalledProcessError as e:
440 cls.logger.critical("Subprocess returned with non-0 return code: ("
444 cls.logger.critical("Subprocess returned with OS error: "
445 "(%s) %s", e.errno, e.strerror)
447 except Exception as e:
448 cls.logger.exception("Subprocess returned unexpected from "
455 def wait_for_stats_socket(cls):
456 deadline = time.time() + 300
458 while time.time() < deadline or \
459 cls.debug_gdb or cls.debug_gdbserver:
460 if os.path.exists(cls.stats_sock):
465 cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
468 def wait_for_coredump(cls):
469 corefile = cls.tempdir + "/core"
470 if os.path.isfile(corefile):
471 cls.logger.error("Waiting for coredump to complete: %s", corefile)
472 curr_size = os.path.getsize(corefile)
473 deadline = time.time() + 60
475 while time.time() < deadline:
478 curr_size = os.path.getsize(corefile)
479 if size == curr_size:
483 cls.logger.error("Timed out waiting for coredump to complete:"
486 cls.logger.error("Coredump complete: %s, size %d",
492 Perform class setup before running the testcase
493 Remove shared memory files, start vpp and connect the vpp-api
495 super(VppTestCase, cls).setUpClass()
496 gc.collect() # run garbage collection first
498 cls.logger = get_logger(cls.__name__)
499 if hasattr(cls, 'parallel_handler'):
500 cls.logger.addHandler(cls.parallel_handler)
501 cls.logger.propagate = False
503 cls.tempdir = tempfile.mkdtemp(
504 prefix='vpp-unittest-%s-' % cls.__name__)
505 cls.stats_sock = "%s/stats.sock" % cls.tempdir
506 cls.api_sock = "%s/api.sock" % cls.tempdir
507 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
508 cls.file_handler.setFormatter(
509 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
511 cls.file_handler.setLevel(DEBUG)
512 cls.logger.addHandler(cls.file_handler)
513 cls.logger.debug("--- setUpClass() for %s called ---" %
515 cls.shm_prefix = os.path.basename(cls.tempdir)
516 os.chdir(cls.tempdir)
517 cls.logger.info("Temporary dir is %s, shm prefix is %s",
518 cls.tempdir, cls.shm_prefix)
520 cls.reset_packet_infos()
524 cls.registry = VppObjectRegistry()
525 cls.vpp_startup_failed = False
526 cls.reporter = KeepAliveReporter()
527 # need to catch exceptions here because if we raise, then the cleanup
528 # doesn't get called and we might end with a zombie vpp
531 cls.reporter.send_keep_alive(cls, 'setUpClass')
532 VppTestResult.current_test_case_info = TestCaseInfo(
533 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
534 cls.vpp_stdout_deque = deque()
535 cls.vpp_stderr_deque = deque()
536 cls.pump_thread_stop_flag = Event()
537 cls.pump_thread_wakeup_pipe = os.pipe()
538 cls.pump_thread = Thread(target=pump_output, args=(cls,))
539 cls.pump_thread.daemon = True
540 cls.pump_thread.start()
541 if cls.debug_gdb or cls.debug_gdbserver:
545 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
548 hook = hookmodule.StepHook(cls)
550 hook = hookmodule.PollHook(cls)
551 cls.vapi.register_hook(hook)
552 cls.wait_for_stats_socket()
553 cls.statistics = VPPStats(socketname=cls.stats_sock)
557 cls.vpp_startup_failed = True
559 "VPP died shortly after startup, check the"
560 " output to standard error for possible cause")
566 cls.vapi.disconnect()
569 if cls.debug_gdbserver:
570 print(colorize("You're running VPP inside gdbserver but "
571 "VPP-API connection failed, did you forget "
572 "to 'continue' VPP from within gdb?", RED))
574 except Exception as e:
575 cls.logger.debug("Exception connecting to VPP: %s" % e)
583 Disconnect vpp-api, kill vpp and cleanup shared memory files
585 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
587 if cls.vpp.returncode is None:
588 print(double_line_delim)
589 print("VPP or GDB server is still running")
590 print(single_line_delim)
591 input("When done debugging, press ENTER to kill the "
592 "process and finish running the testcase...")
594 # first signal that we want to stop the pump thread, then wake it up
595 if hasattr(cls, 'pump_thread_stop_flag'):
596 cls.pump_thread_stop_flag.set()
597 if hasattr(cls, 'pump_thread_wakeup_pipe'):
598 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
599 if hasattr(cls, 'pump_thread'):
600 cls.logger.debug("Waiting for pump thread to stop")
601 cls.pump_thread.join()
602 if hasattr(cls, 'vpp_stderr_reader_thread'):
603 cls.logger.debug("Waiting for stdderr pump to stop")
604 cls.vpp_stderr_reader_thread.join()
606 if hasattr(cls, 'vpp'):
607 if hasattr(cls, 'vapi'):
608 cls.logger.debug("Disconnecting class vapi client on %s",
610 cls.vapi.disconnect()
611 cls.logger.debug("Deleting class vapi attribute on %s",
615 if cls.vpp.returncode is None:
616 cls.wait_for_coredump()
617 cls.logger.debug("Sending TERM to vpp")
619 cls.logger.debug("Waiting for vpp to die")
620 cls.vpp.communicate()
621 cls.logger.debug("Deleting class vpp attribute on %s",
625 if cls.vpp_startup_failed:
626 stdout_log = cls.logger.info
627 stderr_log = cls.logger.critical
629 stdout_log = cls.logger.info
630 stderr_log = cls.logger.info
632 if hasattr(cls, 'vpp_stdout_deque'):
633 stdout_log(single_line_delim)
634 stdout_log('VPP output to stdout while running %s:', cls.__name__)
635 stdout_log(single_line_delim)
636 vpp_output = "".join(cls.vpp_stdout_deque)
637 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
639 stdout_log('\n%s', vpp_output)
640 stdout_log(single_line_delim)
642 if hasattr(cls, 'vpp_stderr_deque'):
643 stderr_log(single_line_delim)
644 stderr_log('VPP output to stderr while running %s:', cls.__name__)
645 stderr_log(single_line_delim)
646 vpp_output = "".join(cls.vpp_stderr_deque)
647 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
649 stderr_log('\n%s', vpp_output)
650 stderr_log(single_line_delim)
653 def tearDownClass(cls):
654 """ Perform final cleanup after running all tests in this test-case """
655 cls.logger.debug("--- tearDownClass() for %s called ---" %
657 cls.reporter.send_keep_alive(cls, 'tearDownClass')
659 cls.file_handler.close()
660 cls.reset_packet_infos()
662 debug_internal.on_tear_down_class(cls)
664 def show_commands_at_teardown(self):
665 """ Allow subclass specific teardown logging additions."""
666 self.logger.info("--- No test specific show commands provided. ---")
669 """ Show various debug prints after each test """
670 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
671 (self.__class__.__name__, self._testMethodName,
672 self._testMethodDoc))
675 if not self.vpp_dead:
676 self.logger.debug(self.vapi.cli("show trace max 1000"))
677 self.logger.info(self.vapi.ppcli("show interface"))
678 self.logger.info(self.vapi.ppcli("show hardware"))
679 self.logger.info(self.statistics.set_errors_str())
680 self.logger.info(self.vapi.ppcli("show run"))
681 self.logger.info(self.vapi.ppcli("show log"))
682 self.logger.info(self.vapi.ppcli("show bihash"))
683 self.logger.info("Logging testcase specific show commands.")
684 self.show_commands_at_teardown()
685 self.registry.remove_vpp_config(self.logger)
686 # Save/Dump VPP api trace log
687 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
688 tmp_api_trace = "/tmp/%s" % api_trace
689 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
690 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
691 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
693 os.rename(tmp_api_trace, vpp_api_trace_log)
694 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
696 except VppTransportShmemIOError:
697 self.logger.debug("VppTransportShmemIOError: Vpp dead. "
698 "Cannot log show commands.")
701 self.registry.unregister_all(self.logger)
704 """ Clear trace before running each test"""
705 super(VppTestCase, self).setUp()
706 self.reporter.send_keep_alive(self)
709 raise VppDiedError(rv=None, testcase=self.__class__.__name__,
710 method_name=self._testMethodName)
711 self.sleep(.1, "during setUp")
712 self.vpp_stdout_deque.append(
713 "--- test setUp() for %s.%s(%s) starts here ---\n" %
714 (self.__class__.__name__, self._testMethodName,
715 self._testMethodDoc))
716 self.vpp_stderr_deque.append(
717 "--- test setUp() for %s.%s(%s) starts here ---\n" %
718 (self.__class__.__name__, self._testMethodName,
719 self._testMethodDoc))
720 self.vapi.cli("clear trace")
721 # store the test instance inside the test class - so that objects
722 # holding the class can access instance methods (like assertEqual)
723 type(self).test_instance = self
726 def pg_enable_capture(cls, interfaces=None):
728 Enable capture on packet-generator interfaces
730 :param interfaces: iterable interface indexes (if None,
731 use self.pg_interfaces)
734 if interfaces is None:
735 interfaces = cls.pg_interfaces
740 def register_capture(cls, cap_name):
741 """ Register a capture in the testclass """
742 # add to the list of captures with current timestamp
743 cls._captures.append((time.time(), cap_name))
747 """ Enable the PG, wait till it is done, then clean up """
748 cls.vapi.cli("trace add pg-input 1000")
749 cls.vapi.cli('packet-generator enable')
750 # PG, when starts, runs to completion -
751 # so let's avoid a race condition,
752 # and wait a little till it's done.
753 # Then clean it up - and then be gone.
754 deadline = time.time() + 300
755 while cls.vapi.cli('show packet-generator').find("Yes") != -1:
756 cls.sleep(0.01) # yield
757 if time.time() > deadline:
758 cls.logger.error("Timeout waiting for pg to stop")
760 for stamp, cap_name in cls._captures:
761 cls.vapi.cli('packet-generator delete %s' % cap_name)
765 def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
767 Create packet-generator interfaces.
769 :param interfaces: iterable indexes of the interfaces.
770 :returns: List of created interfaces.
775 intf = VppPGInterface(cls, i, gso, gso_size)
776 setattr(cls, intf.name, intf)
778 cls.pg_interfaces = result
782 def create_loopback_interfaces(cls, count):
784 Create loopback interfaces.
786 :param count: number of interfaces created.
787 :returns: List of created interfaces.
789 result = [VppLoInterface(cls) for i in range(count)]
791 setattr(cls, intf.name, intf)
792 cls.lo_interfaces = result
796 def create_bvi_interfaces(cls, count):
798 Create BVI interfaces.
800 :param count: number of interfaces created.
801 :returns: List of created interfaces.
803 result = [VppBviInterface(cls) for i in range(count)]
805 setattr(cls, intf.name, intf)
806 cls.bvi_interfaces = result
810 def extend_packet(packet, size, padding=' '):
812 Extend packet to given size by padding with spaces or custom padding
813 NOTE: Currently works only when Raw layer is present.
815 :param packet: packet
816 :param size: target size
817 :param padding: padding used to extend the payload
820 packet_len = len(packet) + 4
821 extend = size - packet_len
823 num = (extend // len(padding)) + 1
824 packet[Raw].load += (padding * num)[:extend].encode("ascii")
827 def reset_packet_infos(cls):
828 """ Reset the list of packet info objects and packet counts to zero """
829 cls._packet_infos = {}
830 cls._packet_count_for_dst_if_idx = {}
833 def create_packet_info(cls, src_if, dst_if):
835 Create packet info object containing the source and destination indexes
836 and add it to the testcase's packet info list
838 :param VppInterface src_if: source interface
839 :param VppInterface dst_if: destination interface
841 :returns: _PacketInfo object
845 info.index = len(cls._packet_infos)
846 info.src = src_if.sw_if_index
847 info.dst = dst_if.sw_if_index
848 if isinstance(dst_if, VppSubInterface):
849 dst_idx = dst_if.parent.sw_if_index
851 dst_idx = dst_if.sw_if_index
852 if dst_idx in cls._packet_count_for_dst_if_idx:
853 cls._packet_count_for_dst_if_idx[dst_idx] += 1
855 cls._packet_count_for_dst_if_idx[dst_idx] = 1
856 cls._packet_infos[info.index] = info
860 def info_to_payload(info):
862 Convert _PacketInfo object to packet payload
864 :param info: _PacketInfo object
866 :returns: string containing serialized data from packet info
868 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
872 def payload_to_info(payload, payload_field='load'):
874 Convert packet payload to _PacketInfo object
876 :param payload: packet payload
877 :type payload: <class 'scapy.packet.Raw'>
878 :param payload_field: packet fieldname of payload "load" for
879 <class 'scapy.packet.Raw'>
880 :type payload_field: str
881 :returns: _PacketInfo object containing de-serialized data from payload
884 numbers = getattr(payload, payload_field).split()
886 info.index = int(numbers[0])
887 info.src = int(numbers[1])
888 info.dst = int(numbers[2])
889 info.ip = int(numbers[3])
890 info.proto = int(numbers[4])
893 def get_next_packet_info(self, info):
895 Iterate over the packet info list stored in the testcase
896 Start iteration with first element if info is None
897 Continue based on index in info if info is specified
899 :param info: info or None
900 :returns: next info in list or None if no more infos
905 next_index = info.index + 1
906 if next_index == len(self._packet_infos):
909 return self._packet_infos[next_index]
911 def get_next_packet_info_for_interface(self, src_index, info):
913 Search the packet info list for the next packet info with same source
916 :param src_index: source interface index to search for
917 :param info: packet info - where to start the search
918 :returns: packet info or None
922 info = self.get_next_packet_info(info)
925 if info.src == src_index:
928 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
930 Search the packet info list for the next packet info with same source
931 and destination interface indexes
933 :param src_index: source interface index to search for
934 :param dst_index: destination interface index to search for
935 :param info: packet info - where to start the search
936 :returns: packet info or None
940 info = self.get_next_packet_info_for_interface(src_index, info)
943 if info.dst == dst_index:
946 def assert_equal(self, real_value, expected_value, name_or_class=None):
947 if name_or_class is None:
948 self.assertEqual(real_value, expected_value)
951 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
952 msg = msg % (getdoc(name_or_class).strip(),
953 real_value, str(name_or_class(real_value)),
954 expected_value, str(name_or_class(expected_value)))
956 msg = "Invalid %s: %s does not match expected value %s" % (
957 name_or_class, real_value, expected_value)
959 self.assertEqual(real_value, expected_value, msg)
961 def assert_in_range(self,
969 msg = "Invalid %s: %s out of range <%s,%s>" % (
970 name, real_value, expected_min, expected_max)
971 self.assertTrue(expected_min <= real_value <= expected_max, msg)
973 def assert_packet_checksums_valid(self, packet,
974 ignore_zero_udp_checksums=True):
975 received = packet.__class__(scapy.compat.raw(packet))
976 udp_layers = ['UDP', 'UDPerror']
977 checksum_fields = ['cksum', 'chksum']
980 temp = received.__class__(scapy.compat.raw(received))
982 layer = temp.getlayer(counter)
984 for cf in checksum_fields:
985 if hasattr(layer, cf):
986 if ignore_zero_udp_checksums and \
987 0 == getattr(layer, cf) and \
988 layer.name in udp_layers:
991 checksums.append((counter, cf))
994 counter = counter + 1
995 if 0 == len(checksums):
997 temp = temp.__class__(scapy.compat.raw(temp))
998 for layer, cf in checksums:
999 calc_sum = getattr(temp[layer], cf)
1001 getattr(received[layer], cf), calc_sum,
1002 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
1004 "Checksum field `%s` on `%s` layer has correct value `%s`" %
1005 (cf, temp[layer].name, calc_sum))
1007 def assert_checksum_valid(self, received_packet, layer,
1008 field_name='chksum',
1009 ignore_zero_checksum=False):
1010 """ Check checksum of received packet on given layer """
1011 received_packet_checksum = getattr(received_packet[layer], field_name)
1012 if ignore_zero_checksum and 0 == received_packet_checksum:
1014 recalculated = received_packet.__class__(
1015 scapy.compat.raw(received_packet))
1016 delattr(recalculated[layer], field_name)
1017 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1018 self.assert_equal(received_packet_checksum,
1019 getattr(recalculated[layer], field_name),
1020 "packet checksum on layer: %s" % layer)
1022 def assert_ip_checksum_valid(self, received_packet,
1023 ignore_zero_checksum=False):
1024 self.assert_checksum_valid(received_packet, 'IP',
1025 ignore_zero_checksum=ignore_zero_checksum)
1027 def assert_tcp_checksum_valid(self, received_packet,
1028 ignore_zero_checksum=False):
1029 self.assert_checksum_valid(received_packet, 'TCP',
1030 ignore_zero_checksum=ignore_zero_checksum)
1032 def assert_udp_checksum_valid(self, received_packet,
1033 ignore_zero_checksum=True):
1034 self.assert_checksum_valid(received_packet, 'UDP',
1035 ignore_zero_checksum=ignore_zero_checksum)
1037 def assert_embedded_icmp_checksum_valid(self, received_packet):
1038 if received_packet.haslayer(IPerror):
1039 self.assert_checksum_valid(received_packet, 'IPerror')
1040 if received_packet.haslayer(TCPerror):
1041 self.assert_checksum_valid(received_packet, 'TCPerror')
1042 if received_packet.haslayer(UDPerror):
1043 self.assert_checksum_valid(received_packet, 'UDPerror',
1044 ignore_zero_checksum=True)
1045 if received_packet.haslayer(ICMPerror):
1046 self.assert_checksum_valid(received_packet, 'ICMPerror')
1048 def assert_icmp_checksum_valid(self, received_packet):
1049 self.assert_checksum_valid(received_packet, 'ICMP')
1050 self.assert_embedded_icmp_checksum_valid(received_packet)
1052 def assert_icmpv6_checksum_valid(self, pkt):
1053 if pkt.haslayer(ICMPv6DestUnreach):
1054 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
1055 self.assert_embedded_icmp_checksum_valid(pkt)
1056 if pkt.haslayer(ICMPv6EchoRequest):
1057 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
1058 if pkt.haslayer(ICMPv6EchoReply):
1059 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
1061 def get_packet_counter(self, counter):
1062 if counter.startswith("/"):
1063 counter_value = self.statistics.get_counter(counter)
1065 counters = self.vapi.cli("sh errors").split('\n')
1067 for i in range(1, len(counters) - 1):
1068 results = counters[i].split()
1069 if results[1] == counter:
1070 counter_value = int(results[0])
1072 return counter_value
1074 def assert_packet_counter_equal(self, counter, expected_value):
1075 counter_value = self.get_packet_counter(counter)
1076 self.assert_equal(counter_value, expected_value,
1077 "packet counter `%s'" % counter)
1079 def assert_error_counter_equal(self, counter, expected_value):
1080 counter_value = self.statistics.get_err_counter(counter)
1081 self.assert_equal(counter_value, expected_value,
1082 "error counter `%s'" % counter)
1085 def sleep(cls, timeout, remark=None):
1087 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1088 # * by Guido, only the main thread can be interrupted.
1090 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1093 if hasattr(os, 'sched_yield'):
1099 if hasattr(cls, 'logger'):
1100 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1101 before = time.time()
1104 if hasattr(cls, 'logger') and after - before > 2 * timeout:
1105 cls.logger.error("unexpected self.sleep() result - "
1106 "slept for %es instead of ~%es!",
1107 after - before, timeout)
1108 if hasattr(cls, 'logger'):
1110 "Finished sleep (%s) - slept %es (wanted %es)",
1111 remark, after - before, timeout)
1113 def pg_send(self, intf, pkts):
1114 self.vapi.cli("clear trace")
1115 intf.add_stream(pkts)
1116 self.pg_enable_capture(self.pg_interfaces)
1119 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
1120 self.pg_send(intf, pkts)
1123 for i in self.pg_interfaces:
1124 i.get_capture(0, timeout=timeout)
1125 i.assert_nothing_captured(remark=remark)
1128 def send_and_expect(self, intf, pkts, output, n_rx=None):
1131 self.pg_send(intf, pkts)
1132 rx = output.get_capture(n_rx)
1135 def send_and_expect_only(self, intf, pkts, output, timeout=None):
1136 self.pg_send(intf, pkts)
1137 rx = output.get_capture(len(pkts))
1141 for i in self.pg_interfaces:
1142 if i not in outputs:
1143 i.get_capture(0, timeout=timeout)
1144 i.assert_nothing_captured()
1150 """ unittest calls runTest when TestCase is instantiated without a
1151 test case. Use case: Writing unittests against VppTestCase"""
1155 def get_testcase_doc_name(test):
1156 return getdoc(test.__class__).splitlines()[0]
1159 def get_test_description(descriptions, test):
1160 short_description = test.shortDescription()
1161 if descriptions and short_description:
1162 return short_description
1167 class TestCaseInfo(object):
1168 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1169 self.logger = logger
1170 self.tempdir = tempdir
1171 self.vpp_pid = vpp_pid
1172 self.vpp_bin_path = vpp_bin_path
1173 self.core_crash_test = None
1176 class VppTestResult(unittest.TestResult):
1178 @property result_string
1179 String variable to store the test case result string.
1181 List variable containing 2-tuples of TestCase instances and strings
1182 holding formatted tracebacks. Each tuple represents a test which
1183 raised an unexpected exception.
1185 List variable containing 2-tuples of TestCase instances and strings
1186 holding formatted tracebacks. Each tuple represents a test where
1187 a failure was explicitly signalled using the TestCase.assert*()
1191 failed_test_cases_info = set()
1192 core_crash_test_cases_info = set()
1193 current_test_case_info = None
1195 def __init__(self, stream=None, descriptions=None, verbosity=None,
1198 :param stream File descriptor to store where to report test results.
1199 Set to the standard error stream by default.
1200 :param descriptions Boolean variable to store information if to use
1201 test case descriptions.
1202 :param verbosity Integer variable to store required verbosity level.
1204 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1205 self.stream = stream
1206 self.descriptions = descriptions
1207 self.verbosity = verbosity
1208 self.result_string = None
1209 self.runner = runner
1211 def addSuccess(self, test):
1213 Record a test succeeded result
1218 if self.current_test_case_info:
1219 self.current_test_case_info.logger.debug(
1220 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1221 test._testMethodName,
1222 test._testMethodDoc))
1223 unittest.TestResult.addSuccess(self, test)
1224 self.result_string = colorize("OK", GREEN)
1226 self.send_result_through_pipe(test, PASS)
1228 def addSkip(self, test, reason):
1230 Record a test skipped.
1236 if self.current_test_case_info:
1237 self.current_test_case_info.logger.debug(
1238 "--- addSkip() %s.%s(%s) called, reason is %s" %
1239 (test.__class__.__name__, test._testMethodName,
1240 test._testMethodDoc, reason))
1241 unittest.TestResult.addSkip(self, test, reason)
1242 self.result_string = colorize("SKIP", YELLOW)
1244 self.send_result_through_pipe(test, SKIP)
1246 def symlink_failed(self):
1247 if self.current_test_case_info:
1249 failed_dir = os.getenv('FAILED_DIR')
1250 link_path = os.path.join(
1253 os.path.basename(self.current_test_case_info.tempdir))
1254 if self.current_test_case_info.logger:
1255 self.current_test_case_info.logger.debug(
1256 "creating a link to the failed test")
1257 self.current_test_case_info.logger.debug(
1258 "os.symlink(%s, %s)" %
1259 (self.current_test_case_info.tempdir, link_path))
1260 if os.path.exists(link_path):
1261 if self.current_test_case_info.logger:
1262 self.current_test_case_info.logger.debug(
1263 'symlink already exists')
1265 os.symlink(self.current_test_case_info.tempdir, link_path)
1267 except Exception as e:
1268 if self.current_test_case_info.logger:
1269 self.current_test_case_info.logger.error(e)
1271 def send_result_through_pipe(self, test, result):
1272 if hasattr(self, 'test_framework_result_pipe'):
1273 pipe = self.test_framework_result_pipe
1275 pipe.send((test.id(), result))
1277 def log_error(self, test, err, fn_name):
1278 if self.current_test_case_info:
1279 if isinstance(test, unittest.suite._ErrorHolder):
1280 test_name = test.description
1282 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1283 test._testMethodName,
1284 test._testMethodDoc)
1285 self.current_test_case_info.logger.debug(
1286 "--- %s() %s called, err is %s" %
1287 (fn_name, test_name, err))
1288 self.current_test_case_info.logger.debug(
1289 "formatted exception is:\n%s" %
1290 "".join(format_exception(*err)))
1292 def add_error(self, test, err, unittest_fn, error_type):
1293 if error_type == FAIL:
1294 self.log_error(test, err, 'addFailure')
1295 error_type_str = colorize("FAIL", RED)
1296 elif error_type == ERROR:
1297 self.log_error(test, err, 'addError')
1298 error_type_str = colorize("ERROR", RED)
1300 raise Exception('Error type %s cannot be used to record an '
1301 'error or a failure' % error_type)
1303 unittest_fn(self, test, err)
1304 if self.current_test_case_info:
1305 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1307 self.current_test_case_info.tempdir)
1308 self.symlink_failed()
1309 self.failed_test_cases_info.add(self.current_test_case_info)
1310 if is_core_present(self.current_test_case_info.tempdir):
1311 if not self.current_test_case_info.core_crash_test:
1312 if isinstance(test, unittest.suite._ErrorHolder):
1313 test_name = str(test)
1315 test_name = "'{!s}' ({!s})".format(
1316 get_testcase_doc_name(test), test.id())
1317 self.current_test_case_info.core_crash_test = test_name
1318 self.core_crash_test_cases_info.add(
1319 self.current_test_case_info)
1321 self.result_string = '%s [no temp dir]' % error_type_str
1323 self.send_result_through_pipe(test, error_type)
1325 def addFailure(self, test, err):
1327 Record a test failed result
1330 :param err: error message
1333 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1335 def addError(self, test, err):
1337 Record a test error result
1340 :param err: error message
1343 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1345 def getDescription(self, test):
1347 Get test description
1350 :returns: test description
1353 return get_test_description(self.descriptions, test)
1355 def startTest(self, test):
1363 def print_header(test):
1364 if not hasattr(test.__class__, '_header_printed'):
1365 print(double_line_delim)
1366 print(colorize(getdoc(test).splitlines()[0], GREEN))
1367 print(double_line_delim)
1368 test.__class__._header_printed = True
1372 unittest.TestResult.startTest(self, test)
1373 if self.verbosity > 0:
1374 self.stream.writeln(
1375 "Starting " + self.getDescription(test) + " ...")
1376 self.stream.writeln(single_line_delim)
1378 def stopTest(self, test):
1380 Called when the given test has been run
1385 unittest.TestResult.stopTest(self, test)
1386 if self.verbosity > 0:
1387 self.stream.writeln(single_line_delim)
1388 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1389 self.result_string))
1390 self.stream.writeln(single_line_delim)
1392 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1393 self.result_string))
1395 self.send_result_through_pipe(test, TEST_RUN)
1397 def printErrors(self):
1399 Print errors from running the test case
1401 if len(self.errors) > 0 or len(self.failures) > 0:
1402 self.stream.writeln()
1403 self.printErrorList('ERROR', self.errors)
1404 self.printErrorList('FAIL', self.failures)
1406 # ^^ that is the last output from unittest before summary
1407 if not self.runner.print_summary:
1408 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1409 self.stream = devnull
1410 self.runner.stream = devnull
1412 def printErrorList(self, flavour, errors):
1414 Print error list to the output stream together with error type
1415 and test case description.
1417 :param flavour: error type
1418 :param errors: iterable errors
1421 for test, err in errors:
1422 self.stream.writeln(double_line_delim)
1423 self.stream.writeln("%s: %s" %
1424 (flavour, self.getDescription(test)))
1425 self.stream.writeln(single_line_delim)
1426 self.stream.writeln("%s" % err)
1429 class VppTestRunner(unittest.TextTestRunner):
1431 A basic test runner implementation which prints results to standard error.
1435 def resultclass(self):
1436 """Class maintaining the results of the tests"""
1437 return VppTestResult
1439 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1440 result_pipe=None, failfast=False, buffer=False,
1441 resultclass=None, print_summary=True, **kwargs):
1442 # ignore stream setting here, use hard-coded stdout to be in sync
1443 # with prints from VppTestCase methods ...
1444 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1445 verbosity, failfast, buffer,
1446 resultclass, **kwargs)
1447 KeepAliveReporter.pipe = keep_alive_pipe
1449 self.orig_stream = self.stream
1450 self.resultclass.test_framework_result_pipe = result_pipe
1452 self.print_summary = print_summary
1454 def _makeResult(self):
1455 return self.resultclass(self.stream,
1460 def run(self, test):
1467 faulthandler.enable() # emit stack trace to stderr if killed by signal
1469 result = super(VppTestRunner, self).run(test)
1470 if not self.print_summary:
1471 self.stream = self.orig_stream
1472 result.stream = self.orig_stream
1476 class Worker(Thread):
1477 def __init__(self, args, logger, env=None):
1478 self.logger = logger
1482 env = {} if env is None else env
1483 self.env = copy.deepcopy(env)
1484 super(Worker, self).__init__()
1487 executable = self.args[0]
1488 if not os.path.exists(executable) or not os.access(
1489 executable, os.F_OK | os.X_OK):
1490 # Exit code that means some system file did not exist,
1491 # could not be opened, or had some other kind of error.
1492 self.result = os.EX_OSFILE
1493 raise EnvironmentError(
1494 "executable '%s' is not found or executable." % executable)
1495 self.logger.debug("Running executable w/args `%s'" % self.args)
1496 env = os.environ.copy()
1497 env.update(self.env)
1498 env["CK_LOG_FILE_NAME"] = "-"
1499 self.process = subprocess.Popen(
1500 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1501 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1502 out, err = self.process.communicate()
1503 self.logger.debug("Finished running `%s'" % executable)
1504 self.logger.info("Return code is `%s'" % self.process.returncode)
1505 self.logger.info(single_line_delim)
1506 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1507 self.logger.info(single_line_delim)
1508 self.logger.info(out)
1509 self.logger.info(single_line_delim)
1510 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1511 self.logger.info(single_line_delim)
1512 self.logger.info(err)
1513 self.logger.info(single_line_delim)
1514 self.result = self.process.returncode
1517 if __name__ == '__main__':