3 from __future__ import print_function
17 from collections import deque
18 from threading import Thread, Event
19 from inspect import getdoc, isclass
20 from traceback import format_exception
21 from logging import FileHandler, DEBUG, Formatter
24 from scapy.packet import Raw
25 import hook as hookmodule
26 from vpp_pg_interface import VppPGInterface
27 from vpp_sub_interface import VppSubInterface
28 from vpp_lo_interface import VppLoInterface
29 from vpp_bvi_interface import VppBviInterface
30 from vpp_papi_provider import VppPapiProvider
31 from vpp_papi.vpp_stats import VPPStats
32 from vpp_papi.vpp_transport_shmem import VppTransportShmemIOError
33 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
35 from vpp_object import VppObjectRegistry
36 from util import ppp, is_core_present
37 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
38 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
39 from scapy.layers.inet6 import ICMPv6EchoReply
41 if os.name == 'posix' and sys.version_info[0] < 3:
42 # using subprocess32 is recommended by python official documentation
43 # @ https://docs.python.org/2/library/subprocess.html
44 import subprocess32 as subprocess
48 # Python2/3 compatible
61 class BoolEnvironmentVariable(object):
63 def __init__(self, env_var_name, default='n', true_values=None):
64 self.name = env_var_name
65 self.default = default
66 self.true_values = true_values if true_values is not None else \
70 return os.getenv(self.name, self.default).lower() in self.true_values
72 if sys.version_info[0] == 2:
73 __nonzero__ = __bool__
76 return 'BoolEnvironmentVariable(%r, default=%r, true_values=%r)' % \
77 (self.name, self.default, self.true_values)
80 debug_framework = BoolEnvironmentVariable('TEST_DEBUG')
85 Test framework module.
87 The module provides a set of tools for constructing and running tests and
88 representing the results.
92 class VppDiedError(Exception):
93 """ exception for reporting that the subprocess has died."""
95 signals_by_value = {v: k for k, v in signal.__dict__.items() if
96 k.startswith('SIG') and not k.startswith('SIG_')}
98 def __init__(self, rv=None, testcase=None, method_name=None):
100 self.signal_name = None
101 self.testcase = testcase
102 self.method_name = method_name
105 self.signal_name = VppDiedError.signals_by_value[-rv]
106 except (KeyError, TypeError):
109 if testcase is None and method_name is None:
112 in_msg = 'running %s.%s ' % (testcase, method_name)
114 msg = "VPP subprocess died %sunexpectedly with return code: %d%s." % (
117 ' [%s]' % (self.signal_name if
118 self.signal_name is not None else ''))
119 super(VppDiedError, self).__init__(msg)
122 class _PacketInfo(object):
123 """Private class to create packet info object.
125 Help process information about the next packet.
126 Set variables to default values.
128 #: Store the index of the packet.
130 #: Store the index of the source packet generator interface of the packet.
132 #: Store the index of the destination packet generator interface
135 #: Store expected ip version
137 #: Store expected upper protocol
139 #: Store the copy of the former packet.
142 def __eq__(self, other):
143 index = self.index == other.index
144 src = self.src == other.src
145 dst = self.dst == other.dst
146 data = self.data == other.data
147 return index and src and dst and data
150 def pump_output(testclass):
151 """ pump output from vpp stdout/stderr to proper queues """
154 while not testclass.pump_thread_stop_flag.is_set():
155 readable = select.select([testclass.vpp.stdout.fileno(),
156 testclass.vpp.stderr.fileno(),
157 testclass.pump_thread_wakeup_pipe[0]],
159 if testclass.vpp.stdout.fileno() in readable:
160 read = os.read(testclass.vpp.stdout.fileno(), 102400)
162 split = read.splitlines(True)
163 if len(stdout_fragment) > 0:
164 split[0] = "%s%s" % (stdout_fragment, split[0])
165 if len(split) > 0 and split[-1].endswith("\n"):
169 stdout_fragment = split[-1]
170 testclass.vpp_stdout_deque.extend(split[:limit])
171 if not testclass.cache_vpp_output:
172 for line in split[:limit]:
173 testclass.logger.debug(
174 "VPP STDOUT: %s" % line.rstrip("\n"))
175 if testclass.vpp.stderr.fileno() in readable:
176 read = os.read(testclass.vpp.stderr.fileno(), 102400)
178 split = read.splitlines(True)
179 if len(stderr_fragment) > 0:
180 split[0] = "%s%s" % (stderr_fragment, split[0])
181 if len(split) > 0 and split[-1].endswith(b"\n"):
185 stderr_fragment = split[-1]
186 testclass.vpp_stderr_deque.extend(split[:limit])
187 if not testclass.cache_vpp_output:
188 for line in split[:limit]:
189 testclass.logger.debug(
190 "VPP STDERR: %s" % line.rstrip("\n"))
191 # ignoring the dummy pipe here intentionally - the
192 # flag will take care of properly terminating the loop
195 def _is_skip_aarch64_set():
196 return BoolEnvironmentVariable('SKIP_AARCH64')
199 is_skip_aarch64_set = _is_skip_aarch64_set()
202 def _is_platform_aarch64():
203 return platform.machine() == 'aarch64'
206 is_platform_aarch64 = _is_platform_aarch64()
209 def _running_extended_tests():
210 return BoolEnvironmentVariable("EXTENDED_TESTS")
213 running_extended_tests = _running_extended_tests()
216 def _running_on_centos():
217 os_id = os.getenv("OS_ID", "")
218 return True if "centos" in os_id.lower() else False
221 running_on_centos = _running_on_centos
224 class KeepAliveReporter(object):
226 Singleton object which reports test start to parent process
231 self.__dict__ = self._shared_state
239 def pipe(self, pipe):
240 if self._pipe is not None:
241 raise Exception("Internal error - pipe should only be set once.")
244 def send_keep_alive(self, test, desc=None):
246 Write current test tmpdir & desc to keep-alive pipe to signal liveness
248 if self.pipe is None:
249 # if not running forked..
253 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
257 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
260 class VppTestCase(unittest.TestCase):
261 """This subclass is a base class for VPP test cases that are implemented as
262 classes. It provides methods to create and run test case.
265 extra_vpp_punt_config = []
266 extra_vpp_plugin_config = []
269 def packet_infos(self):
270 """List of packet infos"""
271 return self._packet_infos
274 def get_packet_count_for_if_idx(cls, dst_if_index):
275 """Get the number of packet info for specified destination if index"""
276 if dst_if_index in cls._packet_count_for_dst_if_idx:
277 return cls._packet_count_for_dst_if_idx[dst_if_index]
283 """Return the instance of this testcase"""
284 return cls.test_instance
287 def set_debug_flags(cls, d):
288 cls.debug_core = False
289 cls.debug_gdb = False
290 cls.debug_gdbserver = False
295 cls.debug_core = True
298 elif dl == "gdbserver":
299 cls.debug_gdbserver = True
301 raise Exception("Unrecognized DEBUG option: '%s'" % d)
304 def get_least_used_cpu():
305 cpu_usage_list = [set(range(psutil.cpu_count()))]
306 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
307 if 'vpp_main' == p.info['name']]
308 for vpp_process in vpp_processes:
309 for cpu_usage_set in cpu_usage_list:
311 cpu_num = vpp_process.cpu_num()
312 if cpu_num in cpu_usage_set:
313 cpu_usage_set_index = cpu_usage_list.index(
315 if cpu_usage_set_index == len(cpu_usage_list) - 1:
316 cpu_usage_list.append({cpu_num})
318 cpu_usage_list[cpu_usage_set_index + 1].add(
320 cpu_usage_set.remove(cpu_num)
322 except psutil.NoSuchProcess:
325 for cpu_usage_set in cpu_usage_list:
326 if len(cpu_usage_set) > 0:
327 min_usage_set = cpu_usage_set
330 return random.choice(tuple(min_usage_set))
333 def setUpConstants(cls):
334 """ Set-up the test case class based on environment variables """
335 cls.step = BoolEnvironmentVariable('STEP')
336 d = os.getenv("DEBUG", None)
337 # inverted case to handle '' == True
338 c = os.getenv("CACHE_OUTPUT", "1")
339 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
340 cls.set_debug_flags(d)
341 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
342 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
343 cls.test_plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
344 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
346 if cls.plugin_path is not None:
347 if cls.extern_plugin_path is not None:
348 plugin_path = "%s:%s" % (
349 cls.plugin_path, cls.extern_plugin_path)
351 plugin_path = cls.plugin_path
352 elif cls.extern_plugin_path is not None:
353 plugin_path = cls.extern_plugin_path
355 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
356 debug_cli = "cli-listen localhost:5002"
358 size = os.getenv("COREDUMP_SIZE")
360 coredump_size = "coredump-size %s" % size
361 if coredump_size is None:
362 coredump_size = "coredump-size unlimited"
364 cpu_core_number = cls.get_least_used_cpu()
366 cls.vpp_cmdline = [cls.vpp_bin, "unix",
367 "{", "nodaemon", debug_cli, "full-coredump",
368 coredump_size, "runtime-dir", cls.tempdir, "}",
369 "api-trace", "{", "on", "}", "api-segment", "{",
370 "prefix", cls.shm_prefix, "}", "cpu", "{",
371 "main-core", str(cpu_core_number), "}",
372 "statseg", "{", "socket-name", cls.stats_sock, "}",
373 "socksvr", "{", "socket-name", cls.api_sock, "}",
375 "{", "plugin", "dpdk_plugin.so", "{", "disable",
376 "}", "plugin", "rdma_plugin.so", "{", "disable",
377 "}", "plugin", "unittest_plugin.so", "{", "enable",
378 "}"] + cls.extra_vpp_plugin_config + ["}", ]
379 if cls.extra_vpp_punt_config is not None:
380 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
381 if plugin_path is not None:
382 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
383 if cls.test_plugin_path is not None:
384 cls.vpp_cmdline.extend(["test_plugin_path", cls.test_plugin_path])
386 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
387 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
390 def wait_for_enter(cls):
391 if cls.debug_gdbserver:
392 print(double_line_delim)
393 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
395 print(double_line_delim)
396 print("Spawned VPP with PID: %d" % cls.vpp.pid)
398 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
400 print(single_line_delim)
401 print("You can debug the VPP using e.g.:")
402 if cls.debug_gdbserver:
403 print("sudo gdb " + cls.vpp_bin +
404 " -ex 'target remote localhost:7777'")
405 print("Now is the time to attach a gdb by running the above "
406 "command, set up breakpoints etc. and then resume VPP from "
407 "within gdb by issuing the 'continue' command")
409 print("sudo gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
410 print("Now is the time to attach a gdb by running the above "
411 "command and set up breakpoints etc.")
412 print(single_line_delim)
413 input("Press ENTER to continue running the testcase...")
417 cmdline = cls.vpp_cmdline
419 if cls.debug_gdbserver:
420 gdbserver = '/usr/bin/gdbserver'
421 if not os.path.isfile(gdbserver) or \
422 not os.access(gdbserver, os.X_OK):
423 raise Exception("gdbserver binary '%s' does not exist or is "
424 "not executable" % gdbserver)
426 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
427 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
430 cls.vpp = subprocess.Popen(cmdline,
431 stdout=subprocess.PIPE,
432 stderr=subprocess.PIPE,
434 except subprocess.CalledProcessError as e:
435 cls.logger.critical("Subprocess returned with non-0 return code: ("
439 cls.logger.critical("Subprocess returned with OS error: "
440 "(%s) %s", e.errno, e.strerror)
442 except Exception as e:
443 cls.logger.exception("Subprocess returned unexpected from "
450 def wait_for_stats_socket(cls):
451 deadline = time.time() + 3
453 while time.time() < deadline or \
454 cls.debug_gdb or cls.debug_gdbserver:
455 if os.path.exists(cls.stats_sock):
460 cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
463 def wait_for_coredump(cls):
464 corefile = cls.tempdir + "/core"
465 if os.path.isfile(corefile):
466 cls.logger.error("Waiting for coredump to complete: %s", corefile)
467 curr_size = os.path.getsize(corefile)
468 deadline = time.time() + 60
470 while time.time() < deadline:
473 curr_size = os.path.getsize(corefile)
474 if size == curr_size:
478 cls.logger.error("Timed out waiting for coredump to complete:"
481 cls.logger.error("Coredump complete: %s, size %d",
487 Perform class setup before running the testcase
488 Remove shared memory files, start vpp and connect the vpp-api
490 super(VppTestCase, cls).setUpClass()
491 gc.collect() # run garbage collection first
493 cls.logger = get_logger(cls.__name__)
494 if hasattr(cls, 'parallel_handler'):
495 cls.logger.addHandler(cls.parallel_handler)
496 cls.logger.propagate = False
498 cls.tempdir = tempfile.mkdtemp(
499 prefix='vpp-unittest-%s-' % cls.__name__)
500 cls.stats_sock = "%s/stats.sock" % cls.tempdir
501 cls.api_sock = "%s/api.sock" % cls.tempdir
502 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
503 cls.file_handler.setFormatter(
504 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
506 cls.file_handler.setLevel(DEBUG)
507 cls.logger.addHandler(cls.file_handler)
508 cls.logger.debug("--- setUpClass() for %s called ---" %
510 cls.shm_prefix = os.path.basename(cls.tempdir)
511 os.chdir(cls.tempdir)
512 cls.logger.info("Temporary dir is %s, shm prefix is %s",
513 cls.tempdir, cls.shm_prefix)
515 cls.reset_packet_infos()
517 cls._zombie_captures = []
520 cls.registry = VppObjectRegistry()
521 cls.vpp_startup_failed = False
522 cls.reporter = KeepAliveReporter()
523 # need to catch exceptions here because if we raise, then the cleanup
524 # doesn't get called and we might end with a zombie vpp
527 cls.reporter.send_keep_alive(cls, 'setUpClass')
528 VppTestResult.current_test_case_info = TestCaseInfo(
529 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
530 cls.vpp_stdout_deque = deque()
531 cls.vpp_stderr_deque = deque()
532 cls.pump_thread_stop_flag = Event()
533 cls.pump_thread_wakeup_pipe = os.pipe()
534 cls.pump_thread = Thread(target=pump_output, args=(cls,))
535 cls.pump_thread.daemon = True
536 cls.pump_thread.start()
537 if cls.debug_gdb or cls.debug_gdbserver:
541 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
544 hook = hookmodule.StepHook(cls)
546 hook = hookmodule.PollHook(cls)
547 cls.vapi.register_hook(hook)
548 cls.wait_for_stats_socket()
549 cls.statistics = VPPStats(socketname=cls.stats_sock)
553 cls.vpp_startup_failed = True
555 "VPP died shortly after startup, check the"
556 " output to standard error for possible cause")
562 cls.vapi.disconnect()
565 if cls.debug_gdbserver:
566 print(colorize("You're running VPP inside gdbserver but "
567 "VPP-API connection failed, did you forget "
568 "to 'continue' VPP from within gdb?", RED))
578 Disconnect vpp-api, kill vpp and cleanup shared memory files
580 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
582 if cls.vpp.returncode is None:
583 print(double_line_delim)
584 print("VPP or GDB server is still running")
585 print(single_line_delim)
586 input("When done debugging, press ENTER to kill the "
587 "process and finish running the testcase...")
589 # first signal that we want to stop the pump thread, then wake it up
590 if hasattr(cls, 'pump_thread_stop_flag'):
591 cls.pump_thread_stop_flag.set()
592 if hasattr(cls, 'pump_thread_wakeup_pipe'):
593 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
594 if hasattr(cls, 'pump_thread'):
595 cls.logger.debug("Waiting for pump thread to stop")
596 cls.pump_thread.join()
597 if hasattr(cls, 'vpp_stderr_reader_thread'):
598 cls.logger.debug("Waiting for stdderr pump to stop")
599 cls.vpp_stderr_reader_thread.join()
601 if hasattr(cls, 'vpp'):
602 if hasattr(cls, 'vapi'):
603 cls.logger.debug("Disconnecting class vapi client on %s",
605 cls.vapi.disconnect()
606 cls.logger.debug("Deleting class vapi attribute on %s",
610 if cls.vpp.returncode is None:
611 cls.wait_for_coredump()
612 cls.logger.debug("Sending TERM to vpp")
614 cls.logger.debug("Waiting for vpp to die")
615 cls.vpp.communicate()
616 cls.logger.debug("Deleting class vpp attribute on %s",
620 if cls.vpp_startup_failed:
621 stdout_log = cls.logger.info
622 stderr_log = cls.logger.critical
624 stdout_log = cls.logger.info
625 stderr_log = cls.logger.info
627 if hasattr(cls, 'vpp_stdout_deque'):
628 stdout_log(single_line_delim)
629 stdout_log('VPP output to stdout while running %s:', cls.__name__)
630 stdout_log(single_line_delim)
631 vpp_output = "".join(cls.vpp_stdout_deque)
632 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
634 stdout_log('\n%s', vpp_output)
635 stdout_log(single_line_delim)
637 if hasattr(cls, 'vpp_stderr_deque'):
638 stderr_log(single_line_delim)
639 stderr_log('VPP output to stderr while running %s:', cls.__name__)
640 stderr_log(single_line_delim)
641 vpp_output = "".join(cls.vpp_stderr_deque)
642 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
644 stderr_log('\n%s', vpp_output)
645 stderr_log(single_line_delim)
648 def tearDownClass(cls):
649 """ Perform final cleanup after running all tests in this test-case """
650 cls.logger.debug("--- tearDownClass() for %s called ---" %
652 cls.reporter.send_keep_alive(cls, 'tearDownClass')
654 cls.file_handler.close()
655 cls.reset_packet_infos()
657 debug_internal.on_tear_down_class(cls)
659 def show_commands_at_teardown(self):
660 """ Allow subclass specific teardown logging additions."""
661 self.logger.info("--- No test specific show commands provided. ---")
664 """ Show various debug prints after each test """
665 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
666 (self.__class__.__name__, self._testMethodName,
667 self._testMethodDoc))
670 if not self.vpp_dead:
671 self.logger.debug(self.vapi.cli("show trace max 1000"))
672 self.logger.info(self.vapi.ppcli("show interface"))
673 self.logger.info(self.vapi.ppcli("show hardware"))
674 self.logger.info(self.statistics.set_errors_str())
675 self.logger.info(self.vapi.ppcli("show run"))
676 self.logger.info(self.vapi.ppcli("show log"))
677 self.logger.info(self.vapi.ppcli("show bihash"))
678 self.logger.info("Logging testcase specific show commands.")
679 self.show_commands_at_teardown()
680 self.registry.remove_vpp_config(self.logger)
681 # Save/Dump VPP api trace log
682 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
683 tmp_api_trace = "/tmp/%s" % api_trace
684 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
685 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
686 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
688 os.rename(tmp_api_trace, vpp_api_trace_log)
689 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
691 except VppTransportShmemIOError:
692 self.logger.debug("VppTransportShmemIOError: Vpp dead. "
693 "Cannot log show commands.")
696 self.registry.unregister_all(self.logger)
699 """ Clear trace before running each test"""
700 super(VppTestCase, self).setUp()
701 self.reporter.send_keep_alive(self)
704 raise VppDiedError(rv=None, testcase=self.__class__.__name__,
705 method_name=self._testMethodName)
706 self.sleep(.1, "during setUp")
707 self.vpp_stdout_deque.append(
708 "--- test setUp() for %s.%s(%s) starts here ---\n" %
709 (self.__class__.__name__, self._testMethodName,
710 self._testMethodDoc))
711 self.vpp_stderr_deque.append(
712 "--- test setUp() for %s.%s(%s) starts here ---\n" %
713 (self.__class__.__name__, self._testMethodName,
714 self._testMethodDoc))
715 self.vapi.cli("clear trace")
716 # store the test instance inside the test class - so that objects
717 # holding the class can access instance methods (like assertEqual)
718 type(self).test_instance = self
721 def pg_enable_capture(cls, interfaces=None):
723 Enable capture on packet-generator interfaces
725 :param interfaces: iterable interface indexes (if None,
726 use self.pg_interfaces)
729 if interfaces is None:
730 interfaces = cls.pg_interfaces
735 def register_capture(cls, cap_name):
736 """ Register a capture in the testclass """
737 # add to the list of captures with current timestamp
738 cls._captures.append((time.time(), cap_name))
739 # filter out from zombies
740 cls._zombie_captures = [(stamp, name)
741 for (stamp, name) in cls._zombie_captures
746 """ Remove any zombie captures and enable the packet generator """
747 # how long before capture is allowed to be deleted - otherwise vpp
748 # crashes - 100ms seems enough (this shouldn't be needed at all)
751 for stamp, cap_name in cls._zombie_captures:
752 wait = stamp + capture_ttl - now
754 cls.sleep(wait, "before deleting capture %s" % cap_name)
756 cls.logger.debug("Removing zombie capture %s" % cap_name)
757 cls.vapi.cli('packet-generator delete %s' % cap_name)
759 cls.vapi.cli("trace add pg-input 1000")
760 cls.vapi.cli('packet-generator enable')
761 cls._zombie_captures = cls._captures
765 def create_pg_interfaces(cls, interfaces):
767 Create packet-generator interfaces.
769 :param interfaces: iterable indexes of the interfaces.
770 :returns: List of created interfaces.
775 intf = VppPGInterface(cls, i)
776 setattr(cls, intf.name, intf)
778 cls.pg_interfaces = result
782 def create_loopback_interfaces(cls, count):
784 Create loopback interfaces.
786 :param count: number of interfaces created.
787 :returns: List of created interfaces.
789 result = [VppLoInterface(cls) for i in range(count)]
791 setattr(cls, intf.name, intf)
792 cls.lo_interfaces = result
796 def create_bvi_interfaces(cls, count):
798 Create BVI interfaces.
800 :param count: number of interfaces created.
801 :returns: List of created interfaces.
803 result = [VppBviInterface(cls) for i in range(count)]
805 setattr(cls, intf.name, intf)
806 cls.bvi_interfaces = result
810 def extend_packet(packet, size, padding=' '):
812 Extend packet to given size by padding with spaces or custom padding
813 NOTE: Currently works only when Raw layer is present.
815 :param packet: packet
816 :param size: target size
817 :param padding: padding used to extend the payload
820 packet_len = len(packet) + 4
821 extend = size - packet_len
823 num = (extend // len(padding)) + 1
824 packet[Raw].load += (padding * num)[:extend].encode("ascii")
827 def reset_packet_infos(cls):
828 """ Reset the list of packet info objects and packet counts to zero """
829 cls._packet_infos = {}
830 cls._packet_count_for_dst_if_idx = {}
833 def create_packet_info(cls, src_if, dst_if):
835 Create packet info object containing the source and destination indexes
836 and add it to the testcase's packet info list
838 :param VppInterface src_if: source interface
839 :param VppInterface dst_if: destination interface
841 :returns: _PacketInfo object
845 info.index = len(cls._packet_infos)
846 info.src = src_if.sw_if_index
847 info.dst = dst_if.sw_if_index
848 if isinstance(dst_if, VppSubInterface):
849 dst_idx = dst_if.parent.sw_if_index
851 dst_idx = dst_if.sw_if_index
852 if dst_idx in cls._packet_count_for_dst_if_idx:
853 cls._packet_count_for_dst_if_idx[dst_idx] += 1
855 cls._packet_count_for_dst_if_idx[dst_idx] = 1
856 cls._packet_infos[info.index] = info
860 def info_to_payload(info):
862 Convert _PacketInfo object to packet payload
864 :param info: _PacketInfo object
866 :returns: string containing serialized data from packet info
868 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
872 def payload_to_info(payload, payload_field='load'):
874 Convert packet payload to _PacketInfo object
876 :param payload: packet payload
877 :type payload: <class 'scapy.packet.Raw'>
878 :param payload_field: packet fieldname of payload "load" for
879 <class 'scapy.packet.Raw'>
880 :type payload_field: str
881 :returns: _PacketInfo object containing de-serialized data from payload
884 numbers = getattr(payload, payload_field).split()
886 info.index = int(numbers[0])
887 info.src = int(numbers[1])
888 info.dst = int(numbers[2])
889 info.ip = int(numbers[3])
890 info.proto = int(numbers[4])
893 def get_next_packet_info(self, info):
895 Iterate over the packet info list stored in the testcase
896 Start iteration with first element if info is None
897 Continue based on index in info if info is specified
899 :param info: info or None
900 :returns: next info in list or None if no more infos
905 next_index = info.index + 1
906 if next_index == len(self._packet_infos):
909 return self._packet_infos[next_index]
911 def get_next_packet_info_for_interface(self, src_index, info):
913 Search the packet info list for the next packet info with same source
916 :param src_index: source interface index to search for
917 :param info: packet info - where to start the search
918 :returns: packet info or None
922 info = self.get_next_packet_info(info)
925 if info.src == src_index:
928 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
930 Search the packet info list for the next packet info with same source
931 and destination interface indexes
933 :param src_index: source interface index to search for
934 :param dst_index: destination interface index to search for
935 :param info: packet info - where to start the search
936 :returns: packet info or None
940 info = self.get_next_packet_info_for_interface(src_index, info)
943 if info.dst == dst_index:
946 def assert_equal(self, real_value, expected_value, name_or_class=None):
947 if name_or_class is None:
948 self.assertEqual(real_value, expected_value)
951 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
952 msg = msg % (getdoc(name_or_class).strip(),
953 real_value, str(name_or_class(real_value)),
954 expected_value, str(name_or_class(expected_value)))
956 msg = "Invalid %s: %s does not match expected value %s" % (
957 name_or_class, real_value, expected_value)
959 self.assertEqual(real_value, expected_value, msg)
961 def assert_in_range(self,
969 msg = "Invalid %s: %s out of range <%s,%s>" % (
970 name, real_value, expected_min, expected_max)
971 self.assertTrue(expected_min <= real_value <= expected_max, msg)
973 def assert_packet_checksums_valid(self, packet,
974 ignore_zero_udp_checksums=True):
975 received = packet.__class__(scapy.compat.raw(packet))
977 ppp("Verifying packet checksums for packet:", received))
978 udp_layers = ['UDP', 'UDPerror']
979 checksum_fields = ['cksum', 'chksum']
982 temp = received.__class__(scapy.compat.raw(received))
984 layer = temp.getlayer(counter)
986 for cf in checksum_fields:
987 if hasattr(layer, cf):
988 if ignore_zero_udp_checksums and \
989 0 == getattr(layer, cf) and \
990 layer.name in udp_layers:
993 checksums.append((counter, cf))
996 counter = counter + 1
997 if 0 == len(checksums):
999 temp = temp.__class__(scapy.compat.raw(temp))
1000 for layer, cf in checksums:
1001 calc_sum = getattr(temp[layer], cf)
1003 getattr(received[layer], cf), calc_sum,
1004 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
1006 "Checksum field `%s` on `%s` layer has correct value `%s`" %
1007 (cf, temp[layer].name, calc_sum))
1009 def assert_checksum_valid(self, received_packet, layer,
1010 field_name='chksum',
1011 ignore_zero_checksum=False):
1012 """ Check checksum of received packet on given layer """
1013 received_packet_checksum = getattr(received_packet[layer], field_name)
1014 if ignore_zero_checksum and 0 == received_packet_checksum:
1016 recalculated = received_packet.__class__(
1017 scapy.compat.raw(received_packet))
1018 delattr(recalculated[layer], field_name)
1019 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1020 self.assert_equal(received_packet_checksum,
1021 getattr(recalculated[layer], field_name),
1022 "packet checksum on layer: %s" % layer)
1024 def assert_ip_checksum_valid(self, received_packet,
1025 ignore_zero_checksum=False):
1026 self.assert_checksum_valid(received_packet, 'IP',
1027 ignore_zero_checksum=ignore_zero_checksum)
1029 def assert_tcp_checksum_valid(self, received_packet,
1030 ignore_zero_checksum=False):
1031 self.assert_checksum_valid(received_packet, 'TCP',
1032 ignore_zero_checksum=ignore_zero_checksum)
1034 def assert_udp_checksum_valid(self, received_packet,
1035 ignore_zero_checksum=True):
1036 self.assert_checksum_valid(received_packet, 'UDP',
1037 ignore_zero_checksum=ignore_zero_checksum)
1039 def assert_embedded_icmp_checksum_valid(self, received_packet):
1040 if received_packet.haslayer(IPerror):
1041 self.assert_checksum_valid(received_packet, 'IPerror')
1042 if received_packet.haslayer(TCPerror):
1043 self.assert_checksum_valid(received_packet, 'TCPerror')
1044 if received_packet.haslayer(UDPerror):
1045 self.assert_checksum_valid(received_packet, 'UDPerror',
1046 ignore_zero_checksum=True)
1047 if received_packet.haslayer(ICMPerror):
1048 self.assert_checksum_valid(received_packet, 'ICMPerror')
1050 def assert_icmp_checksum_valid(self, received_packet):
1051 self.assert_checksum_valid(received_packet, 'ICMP')
1052 self.assert_embedded_icmp_checksum_valid(received_packet)
1054 def assert_icmpv6_checksum_valid(self, pkt):
1055 if pkt.haslayer(ICMPv6DestUnreach):
1056 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
1057 self.assert_embedded_icmp_checksum_valid(pkt)
1058 if pkt.haslayer(ICMPv6EchoRequest):
1059 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
1060 if pkt.haslayer(ICMPv6EchoReply):
1061 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
1063 def get_packet_counter(self, counter):
1064 if counter.startswith("/"):
1065 counter_value = self.statistics.get_counter(counter)
1067 counters = self.vapi.cli("sh errors").split('\n')
1069 for i in range(1, len(counters) - 1):
1070 results = counters[i].split()
1071 if results[1] == counter:
1072 counter_value = int(results[0])
1074 return counter_value
1076 def assert_packet_counter_equal(self, counter, expected_value):
1077 counter_value = self.get_packet_counter(counter)
1078 self.assert_equal(counter_value, expected_value,
1079 "packet counter `%s'" % counter)
1081 def assert_error_counter_equal(self, counter, expected_value):
1082 counter_value = self.statistics.get_err_counter(counter)
1083 self.assert_equal(counter_value, expected_value,
1084 "error counter `%s'" % counter)
1087 def sleep(cls, timeout, remark=None):
1089 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1090 # * by Guido, only the main thread can be interrupted.
1092 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1095 if hasattr(os, 'sched_yield'):
1101 if hasattr(cls, 'logger'):
1102 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1103 before = time.time()
1106 if hasattr(cls, 'logger') and after - before > 2 * timeout:
1107 cls.logger.error("unexpected self.sleep() result - "
1108 "slept for %es instead of ~%es!",
1109 after - before, timeout)
1110 if hasattr(cls, 'logger'):
1112 "Finished sleep (%s) - slept %es (wanted %es)",
1113 remark, after - before, timeout)
1115 def pg_send(self, intf, pkts):
1116 self.vapi.cli("clear trace")
1117 intf.add_stream(pkts)
1118 self.pg_enable_capture(self.pg_interfaces)
1121 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
1122 self.pg_send(intf, pkts)
1125 for i in self.pg_interfaces:
1126 i.get_capture(0, timeout=timeout)
1127 i.assert_nothing_captured(remark=remark)
1130 def send_and_expect(self, intf, pkts, output, n_rx=None):
1133 self.pg_send(intf, pkts)
1134 rx = output.get_capture(n_rx)
1137 def send_and_expect_only(self, intf, pkts, output, timeout=None):
1138 self.pg_send(intf, pkts)
1139 rx = output.get_capture(len(pkts))
1143 for i in self.pg_interfaces:
1144 if i not in outputs:
1145 i.get_capture(0, timeout=timeout)
1146 i.assert_nothing_captured()
1152 """ unittest calls runTest when TestCase is instantiated without a
1153 test case. Use case: Writing unittests against VppTestCase"""
1157 def get_testcase_doc_name(test):
1158 return getdoc(test.__class__).splitlines()[0]
1161 def get_test_description(descriptions, test):
1162 short_description = test.shortDescription()
1163 if descriptions and short_description:
1164 return short_description
1169 class TestCaseInfo(object):
1170 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1171 self.logger = logger
1172 self.tempdir = tempdir
1173 self.vpp_pid = vpp_pid
1174 self.vpp_bin_path = vpp_bin_path
1175 self.core_crash_test = None
1178 class VppTestResult(unittest.TestResult):
1180 @property result_string
1181 String variable to store the test case result string.
1183 List variable containing 2-tuples of TestCase instances and strings
1184 holding formatted tracebacks. Each tuple represents a test which
1185 raised an unexpected exception.
1187 List variable containing 2-tuples of TestCase instances and strings
1188 holding formatted tracebacks. Each tuple represents a test where
1189 a failure was explicitly signalled using the TestCase.assert*()
1193 failed_test_cases_info = set()
1194 core_crash_test_cases_info = set()
1195 current_test_case_info = None
1197 def __init__(self, stream=None, descriptions=None, verbosity=None,
1200 :param stream File descriptor to store where to report test results.
1201 Set to the standard error stream by default.
1202 :param descriptions Boolean variable to store information if to use
1203 test case descriptions.
1204 :param verbosity Integer variable to store required verbosity level.
1206 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1207 self.stream = stream
1208 self.descriptions = descriptions
1209 self.verbosity = verbosity
1210 self.result_string = None
1211 self.runner = runner
1213 def addSuccess(self, test):
1215 Record a test succeeded result
1220 if self.current_test_case_info:
1221 self.current_test_case_info.logger.debug(
1222 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1223 test._testMethodName,
1224 test._testMethodDoc))
1225 unittest.TestResult.addSuccess(self, test)
1226 self.result_string = colorize("OK", GREEN)
1228 self.send_result_through_pipe(test, PASS)
1230 def addSkip(self, test, reason):
1232 Record a test skipped.
1238 if self.current_test_case_info:
1239 self.current_test_case_info.logger.debug(
1240 "--- addSkip() %s.%s(%s) called, reason is %s" %
1241 (test.__class__.__name__, test._testMethodName,
1242 test._testMethodDoc, reason))
1243 unittest.TestResult.addSkip(self, test, reason)
1244 self.result_string = colorize("SKIP", YELLOW)
1246 self.send_result_through_pipe(test, SKIP)
1248 def symlink_failed(self):
1249 if self.current_test_case_info:
1251 failed_dir = os.getenv('FAILED_DIR')
1252 link_path = os.path.join(
1255 os.path.basename(self.current_test_case_info.tempdir))
1256 if self.current_test_case_info.logger:
1257 self.current_test_case_info.logger.debug(
1258 "creating a link to the failed test")
1259 self.current_test_case_info.logger.debug(
1260 "os.symlink(%s, %s)" %
1261 (self.current_test_case_info.tempdir, link_path))
1262 if os.path.exists(link_path):
1263 if self.current_test_case_info.logger:
1264 self.current_test_case_info.logger.debug(
1265 'symlink already exists')
1267 os.symlink(self.current_test_case_info.tempdir, link_path)
1269 except Exception as e:
1270 if self.current_test_case_info.logger:
1271 self.current_test_case_info.logger.error(e)
1273 def send_result_through_pipe(self, test, result):
1274 if hasattr(self, 'test_framework_result_pipe'):
1275 pipe = self.test_framework_result_pipe
1277 pipe.send((test.id(), result))
1279 def log_error(self, test, err, fn_name):
1280 if self.current_test_case_info:
1281 if isinstance(test, unittest.suite._ErrorHolder):
1282 test_name = test.description
1284 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1285 test._testMethodName,
1286 test._testMethodDoc)
1287 self.current_test_case_info.logger.debug(
1288 "--- %s() %s called, err is %s" %
1289 (fn_name, test_name, err))
1290 self.current_test_case_info.logger.debug(
1291 "formatted exception is:\n%s" %
1292 "".join(format_exception(*err)))
1294 def add_error(self, test, err, unittest_fn, error_type):
1295 if error_type == FAIL:
1296 self.log_error(test, err, 'addFailure')
1297 error_type_str = colorize("FAIL", RED)
1298 elif error_type == ERROR:
1299 self.log_error(test, err, 'addError')
1300 error_type_str = colorize("ERROR", RED)
1302 raise Exception('Error type %s cannot be used to record an '
1303 'error or a failure' % error_type)
1305 unittest_fn(self, test, err)
1306 if self.current_test_case_info:
1307 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1309 self.current_test_case_info.tempdir)
1310 self.symlink_failed()
1311 self.failed_test_cases_info.add(self.current_test_case_info)
1312 if is_core_present(self.current_test_case_info.tempdir):
1313 if not self.current_test_case_info.core_crash_test:
1314 if isinstance(test, unittest.suite._ErrorHolder):
1315 test_name = str(test)
1317 test_name = "'{!s}' ({!s})".format(
1318 get_testcase_doc_name(test), test.id())
1319 self.current_test_case_info.core_crash_test = test_name
1320 self.core_crash_test_cases_info.add(
1321 self.current_test_case_info)
1323 self.result_string = '%s [no temp dir]' % error_type_str
1325 self.send_result_through_pipe(test, error_type)
1327 def addFailure(self, test, err):
1329 Record a test failed result
1332 :param err: error message
1335 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1337 def addError(self, test, err):
1339 Record a test error result
1342 :param err: error message
1345 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1347 def getDescription(self, test):
1349 Get test description
1352 :returns: test description
1355 return get_test_description(self.descriptions, test)
1357 def startTest(self, test):
1365 def print_header(test):
1366 if not hasattr(test.__class__, '_header_printed'):
1367 print(double_line_delim)
1368 print(colorize(getdoc(test).splitlines()[0], GREEN))
1369 print(double_line_delim)
1370 test.__class__._header_printed = True
1374 unittest.TestResult.startTest(self, test)
1375 if self.verbosity > 0:
1376 self.stream.writeln(
1377 "Starting " + self.getDescription(test) + " ...")
1378 self.stream.writeln(single_line_delim)
1380 def stopTest(self, test):
1382 Called when the given test has been run
1387 unittest.TestResult.stopTest(self, test)
1388 if self.verbosity > 0:
1389 self.stream.writeln(single_line_delim)
1390 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1391 self.result_string))
1392 self.stream.writeln(single_line_delim)
1394 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1395 self.result_string))
1397 self.send_result_through_pipe(test, TEST_RUN)
1399 def printErrors(self):
1401 Print errors from running the test case
1403 if len(self.errors) > 0 or len(self.failures) > 0:
1404 self.stream.writeln()
1405 self.printErrorList('ERROR', self.errors)
1406 self.printErrorList('FAIL', self.failures)
1408 # ^^ that is the last output from unittest before summary
1409 if not self.runner.print_summary:
1410 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1411 self.stream = devnull
1412 self.runner.stream = devnull
1414 def printErrorList(self, flavour, errors):
1416 Print error list to the output stream together with error type
1417 and test case description.
1419 :param flavour: error type
1420 :param errors: iterable errors
1423 for test, err in errors:
1424 self.stream.writeln(double_line_delim)
1425 self.stream.writeln("%s: %s" %
1426 (flavour, self.getDescription(test)))
1427 self.stream.writeln(single_line_delim)
1428 self.stream.writeln("%s" % err)
1431 class VppTestRunner(unittest.TextTestRunner):
1433 A basic test runner implementation which prints results to standard error.
1437 def resultclass(self):
1438 """Class maintaining the results of the tests"""
1439 return VppTestResult
1441 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1442 result_pipe=None, failfast=False, buffer=False,
1443 resultclass=None, print_summary=True, **kwargs):
1444 # ignore stream setting here, use hard-coded stdout to be in sync
1445 # with prints from VppTestCase methods ...
1446 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1447 verbosity, failfast, buffer,
1448 resultclass, **kwargs)
1449 KeepAliveReporter.pipe = keep_alive_pipe
1451 self.orig_stream = self.stream
1452 self.resultclass.test_framework_result_pipe = result_pipe
1454 self.print_summary = print_summary
1456 def _makeResult(self):
1457 return self.resultclass(self.stream,
1462 def run(self, test):
1469 faulthandler.enable() # emit stack trace to stderr if killed by signal
1471 result = super(VppTestRunner, self).run(test)
1472 if not self.print_summary:
1473 self.stream = self.orig_stream
1474 result.stream = self.orig_stream
1478 class Worker(Thread):
1479 def __init__(self, args, logger, env=None):
1480 self.logger = logger
1484 env = {} if env is None else env
1485 self.env = copy.deepcopy(env)
1486 super(Worker, self).__init__()
1489 executable = self.args[0]
1490 if not os.path.exists(executable) or not os.access(
1491 executable, os.F_OK | os.X_OK):
1492 # Exit code that means some system file did not exist,
1493 # could not be opened, or had some other kind of error.
1494 self.result = os.EX_OSFILE
1495 raise EnvironmentError(
1496 "executable '%s' is not found or executable." % executable)
1497 self.logger.debug("Running executable w/args `%s'" % self.args)
1498 env = os.environ.copy()
1499 env.update(self.env)
1500 env["CK_LOG_FILE_NAME"] = "-"
1501 self.process = subprocess.Popen(
1502 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1503 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1504 out, err = self.process.communicate()
1505 self.logger.debug("Finished running `%s'" % executable)
1506 self.logger.info("Return code is `%s'" % self.process.returncode)
1507 self.logger.info(single_line_delim)
1508 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1509 self.logger.info(single_line_delim)
1510 self.logger.info(out)
1511 self.logger.info(single_line_delim)
1512 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1513 self.logger.info(single_line_delim)
1514 self.logger.info(err)
1515 self.logger.info(single_line_delim)
1516 self.result = self.process.returncode
1519 if __name__ == '__main__':