3 from __future__ import print_function
17 from collections import deque
18 from threading import Thread, Event
19 from inspect import getdoc, isclass
20 from traceback import format_exception
21 from logging import FileHandler, DEBUG, Formatter
24 from scapy.packet import Raw
25 import hook as hookmodule
26 from vpp_pg_interface import VppPGInterface
27 from vpp_sub_interface import VppSubInterface
28 from vpp_lo_interface import VppLoInterface
29 from vpp_bvi_interface import VppBviInterface
30 from vpp_papi_provider import VppPapiProvider
31 from vpp_papi.vpp_stats import VPPStats
32 from vpp_papi.vpp_transport_shmem import VppTransportShmemIOError
33 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
35 from vpp_object import VppObjectRegistry
36 from util import ppp, is_core_present
37 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
38 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
39 from scapy.layers.inet6 import ICMPv6EchoReply
41 if os.name == 'posix' and sys.version_info[0] < 3:
42 # using subprocess32 is recommended by python official documentation
43 # @ https://docs.python.org/2/library/subprocess.html
44 import subprocess32 as subprocess
48 # Python2/3 compatible
60 debug_framework = False
61 if os.getenv('TEST_DEBUG', "0") == "1":
62 debug_framework = True
66 Test framework module.
68 The module provides a set of tools for constructing and running tests and
69 representing the results.
73 class VppDiedError(Exception):
74 """ exception for reporting that the subprocess has died."""
76 signals_by_value = {v: k for k, v in signal.__dict__.items() if
77 k.startswith('SIG') and not k.startswith('SIG_')}
79 def __init__(self, rv=None, testcase=None, method_name=None):
81 self.signal_name = None
82 self.testcase = testcase
83 self.method_name = method_name
86 self.signal_name = VppDiedError.signals_by_value[-rv]
87 except (KeyError, TypeError):
90 if testcase is None and method_name is None:
93 in_msg = 'running %s.%s ' % (testcase, method_name)
95 msg = "VPP subprocess died %sunexpectedly with return code: %d%s." % (
98 ' [%s]' % self.signal_name if
99 self.signal_name is not None else '')
100 super(VppDiedError, self).__init__(msg)
103 class _PacketInfo(object):
104 """Private class to create packet info object.
106 Help process information about the next packet.
107 Set variables to default values.
109 #: Store the index of the packet.
111 #: Store the index of the source packet generator interface of the packet.
113 #: Store the index of the destination packet generator interface
116 #: Store expected ip version
118 #: Store expected upper protocol
120 #: Store the copy of the former packet.
123 def __eq__(self, other):
124 index = self.index == other.index
125 src = self.src == other.src
126 dst = self.dst == other.dst
127 data = self.data == other.data
128 return index and src and dst and data
131 def pump_output(testclass):
132 """ pump output from vpp stdout/stderr to proper queues """
135 while not testclass.pump_thread_stop_flag.is_set():
136 readable = select.select([testclass.vpp.stdout.fileno(),
137 testclass.vpp.stderr.fileno(),
138 testclass.pump_thread_wakeup_pipe[0]],
140 if testclass.vpp.stdout.fileno() in readable:
141 read = os.read(testclass.vpp.stdout.fileno(), 102400)
143 split = read.splitlines(True)
144 if len(stdout_fragment) > 0:
145 split[0] = "%s%s" % (stdout_fragment, split[0])
146 if len(split) > 0 and split[-1].endswith("\n"):
150 stdout_fragment = split[-1]
151 testclass.vpp_stdout_deque.extend(split[:limit])
152 if not testclass.cache_vpp_output:
153 for line in split[:limit]:
154 testclass.logger.debug(
155 "VPP STDOUT: %s" % line.rstrip("\n"))
156 if testclass.vpp.stderr.fileno() in readable:
157 read = os.read(testclass.vpp.stderr.fileno(), 102400)
159 split = read.splitlines(True)
160 if len(stderr_fragment) > 0:
161 split[0] = "%s%s" % (stderr_fragment, split[0])
162 if len(split) > 0 and split[-1].endswith(b"\n"):
166 stderr_fragment = split[-1]
167 testclass.vpp_stderr_deque.extend(split[:limit])
168 if not testclass.cache_vpp_output:
169 for line in split[:limit]:
170 testclass.logger.debug(
171 "VPP STDERR: %s" % line.rstrip("\n"))
172 # ignoring the dummy pipe here intentionally - the
173 # flag will take care of properly terminating the loop
176 def _is_skip_aarch64_set():
177 return os.getenv('SKIP_AARCH64', 'n').lower() in ('yes', 'y', '1')
180 is_skip_aarch64_set = _is_skip_aarch64_set()
183 def _is_platform_aarch64():
184 return platform.machine() == 'aarch64'
187 is_platform_aarch64 = _is_platform_aarch64()
190 def _running_extended_tests():
191 s = os.getenv("EXTENDED_TESTS", "n")
192 return True if s.lower() in ("y", "yes", "1") else False
195 running_extended_tests = _running_extended_tests()
198 def _running_on_centos():
199 os_id = os.getenv("OS_ID", "")
200 return True if "centos" in os_id.lower() else False
203 running_on_centos = _running_on_centos
206 class KeepAliveReporter(object):
208 Singleton object which reports test start to parent process
213 self.__dict__ = self._shared_state
221 def pipe(self, pipe):
222 if self._pipe is not None:
223 raise Exception("Internal error - pipe should only be set once.")
226 def send_keep_alive(self, test, desc=None):
228 Write current test tmpdir & desc to keep-alive pipe to signal liveness
230 if self.pipe is None:
231 # if not running forked..
235 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
239 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
242 class VppTestCase(unittest.TestCase):
243 """This subclass is a base class for VPP test cases that are implemented as
244 classes. It provides methods to create and run test case.
247 extra_vpp_punt_config = []
248 extra_vpp_plugin_config = []
251 def packet_infos(self):
252 """List of packet infos"""
253 return self._packet_infos
256 def get_packet_count_for_if_idx(cls, dst_if_index):
257 """Get the number of packet info for specified destination if index"""
258 if dst_if_index in cls._packet_count_for_dst_if_idx:
259 return cls._packet_count_for_dst_if_idx[dst_if_index]
265 """Return the instance of this testcase"""
266 return cls.test_instance
269 def set_debug_flags(cls, d):
270 cls.debug_core = False
271 cls.debug_gdb = False
272 cls.debug_gdbserver = False
277 cls.debug_core = True
280 elif dl == "gdbserver":
281 cls.debug_gdbserver = True
283 raise Exception("Unrecognized DEBUG option: '%s'" % d)
286 def get_least_used_cpu():
287 cpu_usage_list = [set(range(psutil.cpu_count()))]
288 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
289 if 'vpp_main' == p.info['name']]
290 for vpp_process in vpp_processes:
291 for cpu_usage_set in cpu_usage_list:
293 cpu_num = vpp_process.cpu_num()
294 if cpu_num in cpu_usage_set:
295 cpu_usage_set_index = cpu_usage_list.index(
297 if cpu_usage_set_index == len(cpu_usage_list) - 1:
298 cpu_usage_list.append({cpu_num})
300 cpu_usage_list[cpu_usage_set_index + 1].add(
302 cpu_usage_set.remove(cpu_num)
304 except psutil.NoSuchProcess:
307 for cpu_usage_set in cpu_usage_list:
308 if len(cpu_usage_set) > 0:
309 min_usage_set = cpu_usage_set
312 return random.choice(tuple(min_usage_set))
315 def setUpConstants(cls):
316 """ Set-up the test case class based on environment variables """
317 s = os.getenv("STEP", "n")
318 cls.step = True if s.lower() in ("y", "yes", "1") else False
319 d = os.getenv("DEBUG", None)
320 c = os.getenv("CACHE_OUTPUT", "1")
321 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
322 cls.set_debug_flags(d)
323 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
324 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
325 cls.test_plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
326 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
328 if cls.plugin_path is not None:
329 if cls.extern_plugin_path is not None:
330 plugin_path = "%s:%s" % (
331 cls.plugin_path, cls.extern_plugin_path)
333 plugin_path = cls.plugin_path
334 elif cls.extern_plugin_path is not None:
335 plugin_path = cls.extern_plugin_path
337 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
338 debug_cli = "cli-listen localhost:5002"
340 size = os.getenv("COREDUMP_SIZE")
342 coredump_size = "coredump-size %s" % size
343 if coredump_size is None:
344 coredump_size = "coredump-size unlimited"
346 cpu_core_number = cls.get_least_used_cpu()
348 cls.vpp_cmdline = [cls.vpp_bin, "unix",
349 "{", "nodaemon", debug_cli, "full-coredump",
350 coredump_size, "runtime-dir", cls.tempdir, "}",
351 "api-trace", "{", "on", "}", "api-segment", "{",
352 "prefix", cls.shm_prefix, "}", "cpu", "{",
353 "main-core", str(cpu_core_number), "}",
354 "statseg", "{", "socket-name", cls.stats_sock, "}",
355 "socksvr", "{", "socket-name", cls.api_sock, "}",
357 "{", "plugin", "dpdk_plugin.so", "{", "disable",
358 "}", "plugin", "rdma_plugin.so", "{", "disable",
359 "}", "plugin", "unittest_plugin.so", "{", "enable",
360 "}"] + cls.extra_vpp_plugin_config + ["}", ]
361 if cls.extra_vpp_punt_config is not None:
362 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
363 if plugin_path is not None:
364 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
365 if cls.test_plugin_path is not None:
366 cls.vpp_cmdline.extend(["test_plugin_path", cls.test_plugin_path])
368 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
369 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
372 def wait_for_enter(cls):
373 if cls.debug_gdbserver:
374 print(double_line_delim)
375 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
377 print(double_line_delim)
378 print("Spawned VPP with PID: %d" % cls.vpp.pid)
380 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
382 print(single_line_delim)
383 print("You can debug the VPP using e.g.:")
384 if cls.debug_gdbserver:
385 print("sudo gdb " + cls.vpp_bin +
386 " -ex 'target remote localhost:7777'")
387 print("Now is the time to attach a gdb by running the above "
388 "command, set up breakpoints etc. and then resume VPP from "
389 "within gdb by issuing the 'continue' command")
391 print("sudo gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
392 print("Now is the time to attach a gdb by running the above "
393 "command and set up breakpoints etc.")
394 print(single_line_delim)
395 input("Press ENTER to continue running the testcase...")
399 cmdline = cls.vpp_cmdline
401 if cls.debug_gdbserver:
402 gdbserver = '/usr/bin/gdbserver'
403 if not os.path.isfile(gdbserver) or \
404 not os.access(gdbserver, os.X_OK):
405 raise Exception("gdbserver binary '%s' does not exist or is "
406 "not executable" % gdbserver)
408 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
409 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
412 cls.vpp = subprocess.Popen(cmdline,
413 stdout=subprocess.PIPE,
414 stderr=subprocess.PIPE,
416 except subprocess.CalledProcessError as e:
417 cls.logger.critical("Subprocess returned with non-0 return code: ("
421 cls.logger.critical("Subprocess returned with OS error: "
422 "(%s) %s", e.errno, e.strerror)
424 except Exception as e:
425 cls.logger.exception("Subprocess returned unexpected from "
432 def wait_for_stats_socket(cls):
433 deadline = time.time() + 3
435 while time.time() < deadline or \
436 cls.debug_gdb or cls.debug_gdbserver:
437 if os.path.exists(cls.stats_sock):
442 cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
445 def wait_for_coredump(cls):
446 corefile = cls.tempdir + "/core"
447 if os.path.isfile(corefile):
448 cls.logger.error("Waiting for coredump to complete: %s", corefile)
449 curr_size = os.path.getsize(corefile)
450 deadline = time.time() + 60
452 while time.time() < deadline:
455 curr_size = os.path.getsize(corefile)
456 if size == curr_size:
460 cls.logger.error("Timed out waiting for coredump to complete:"
463 cls.logger.error("Coredump complete: %s, size %d",
469 Perform class setup before running the testcase
470 Remove shared memory files, start vpp and connect the vpp-api
472 super(VppTestCase, cls).setUpClass()
473 gc.collect() # run garbage collection first
475 cls.logger = get_logger(cls.__name__)
476 if hasattr(cls, 'parallel_handler'):
477 cls.logger.addHandler(cls.parallel_handler)
478 cls.logger.propagate = False
480 cls.tempdir = tempfile.mkdtemp(
481 prefix='vpp-unittest-%s-' % cls.__name__)
482 cls.stats_sock = "%s/stats.sock" % cls.tempdir
483 cls.api_sock = "%s/api.sock" % cls.tempdir
484 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
485 cls.file_handler.setFormatter(
486 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
488 cls.file_handler.setLevel(DEBUG)
489 cls.logger.addHandler(cls.file_handler)
490 cls.logger.debug("--- setUpClass() for %s called ---" %
492 cls.shm_prefix = os.path.basename(cls.tempdir)
493 os.chdir(cls.tempdir)
494 cls.logger.info("Temporary dir is %s, shm prefix is %s",
495 cls.tempdir, cls.shm_prefix)
497 cls.reset_packet_infos()
499 cls._zombie_captures = []
502 cls.registry = VppObjectRegistry()
503 cls.vpp_startup_failed = False
504 cls.reporter = KeepAliveReporter()
505 # need to catch exceptions here because if we raise, then the cleanup
506 # doesn't get called and we might end with a zombie vpp
509 cls.reporter.send_keep_alive(cls, 'setUpClass')
510 VppTestResult.current_test_case_info = TestCaseInfo(
511 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
512 cls.vpp_stdout_deque = deque()
513 cls.vpp_stderr_deque = deque()
514 cls.pump_thread_stop_flag = Event()
515 cls.pump_thread_wakeup_pipe = os.pipe()
516 cls.pump_thread = Thread(target=pump_output, args=(cls,))
517 cls.pump_thread.daemon = True
518 cls.pump_thread.start()
519 if cls.debug_gdb or cls.debug_gdbserver:
523 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
526 hook = hookmodule.StepHook(cls)
528 hook = hookmodule.PollHook(cls)
529 cls.vapi.register_hook(hook)
530 cls.wait_for_stats_socket()
531 cls.statistics = VPPStats(socketname=cls.stats_sock)
535 cls.vpp_startup_failed = True
537 "VPP died shortly after startup, check the"
538 " output to standard error for possible cause")
544 cls.vapi.disconnect()
547 if cls.debug_gdbserver:
548 print(colorize("You're running VPP inside gdbserver but "
549 "VPP-API connection failed, did you forget "
550 "to 'continue' VPP from within gdb?", RED))
560 Disconnect vpp-api, kill vpp and cleanup shared memory files
562 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
564 if cls.vpp.returncode is None:
565 print(double_line_delim)
566 print("VPP or GDB server is still running")
567 print(single_line_delim)
568 input("When done debugging, press ENTER to kill the "
569 "process and finish running the testcase...")
571 # first signal that we want to stop the pump thread, then wake it up
572 if hasattr(cls, 'pump_thread_stop_flag'):
573 cls.pump_thread_stop_flag.set()
574 if hasattr(cls, 'pump_thread_wakeup_pipe'):
575 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
576 if hasattr(cls, 'pump_thread'):
577 cls.logger.debug("Waiting for pump thread to stop")
578 cls.pump_thread.join()
579 if hasattr(cls, 'vpp_stderr_reader_thread'):
580 cls.logger.debug("Waiting for stdderr pump to stop")
581 cls.vpp_stderr_reader_thread.join()
583 if hasattr(cls, 'vpp'):
584 if hasattr(cls, 'vapi'):
585 cls.logger.debug("Disconnecting class vapi client on %s",
587 cls.vapi.disconnect()
588 cls.logger.debug("Deleting class vapi attribute on %s",
592 if cls.vpp.returncode is None:
593 cls.wait_for_coredump()
594 cls.logger.debug("Sending TERM to vpp")
596 cls.logger.debug("Waiting for vpp to die")
597 cls.vpp.communicate()
598 cls.logger.debug("Deleting class vpp attribute on %s",
602 if cls.vpp_startup_failed:
603 stdout_log = cls.logger.info
604 stderr_log = cls.logger.critical
606 stdout_log = cls.logger.info
607 stderr_log = cls.logger.info
609 if hasattr(cls, 'vpp_stdout_deque'):
610 stdout_log(single_line_delim)
611 stdout_log('VPP output to stdout while running %s:', cls.__name__)
612 stdout_log(single_line_delim)
613 vpp_output = "".join(cls.vpp_stdout_deque)
614 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
616 stdout_log('\n%s', vpp_output)
617 stdout_log(single_line_delim)
619 if hasattr(cls, 'vpp_stderr_deque'):
620 stderr_log(single_line_delim)
621 stderr_log('VPP output to stderr while running %s:', cls.__name__)
622 stderr_log(single_line_delim)
623 vpp_output = "".join(cls.vpp_stderr_deque)
624 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
626 stderr_log('\n%s', vpp_output)
627 stderr_log(single_line_delim)
630 def tearDownClass(cls):
631 """ Perform final cleanup after running all tests in this test-case """
632 cls.logger.debug("--- tearDownClass() for %s called ---" %
634 cls.reporter.send_keep_alive(cls, 'tearDownClass')
636 cls.file_handler.close()
637 cls.reset_packet_infos()
639 debug_internal.on_tear_down_class(cls)
641 def show_commands_at_teardown(self):
642 """ Allow subclass specific teardown logging additions."""
643 self.logger.info("--- No test specific show commands provided. ---")
646 """ Show various debug prints after each test """
647 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
648 (self.__class__.__name__, self._testMethodName,
649 self._testMethodDoc))
652 if not self.vpp_dead:
653 self.logger.debug(self.vapi.cli("show trace max 1000"))
654 self.logger.info(self.vapi.ppcli("show interface"))
655 self.logger.info(self.vapi.ppcli("show hardware"))
656 self.logger.info(self.statistics.set_errors_str())
657 self.logger.info(self.vapi.ppcli("show run"))
658 self.logger.info(self.vapi.ppcli("show log"))
659 self.logger.info("Logging testcase specific show commands.")
660 self.show_commands_at_teardown()
661 self.registry.remove_vpp_config(self.logger)
662 # Save/Dump VPP api trace log
663 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
664 tmp_api_trace = "/tmp/%s" % api_trace
665 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
666 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
667 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
669 os.rename(tmp_api_trace, vpp_api_trace_log)
670 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
672 except VppTransportShmemIOError:
673 self.logger.debug("VppTransportShmemIOError: Vpp dead. "
674 "Cannot log show commands.")
677 self.registry.unregister_all(self.logger)
680 """ Clear trace before running each test"""
681 super(VppTestCase, self).setUp()
682 self.reporter.send_keep_alive(self)
685 raise VppDiedError(rv=None, testcase=self.__class__.__name__,
686 method_name=self._testMethodName)
687 self.sleep(.1, "during setUp")
688 self.vpp_stdout_deque.append(
689 "--- test setUp() for %s.%s(%s) starts here ---\n" %
690 (self.__class__.__name__, self._testMethodName,
691 self._testMethodDoc))
692 self.vpp_stderr_deque.append(
693 "--- test setUp() for %s.%s(%s) starts here ---\n" %
694 (self.__class__.__name__, self._testMethodName,
695 self._testMethodDoc))
696 self.vapi.cli("clear trace")
697 # store the test instance inside the test class - so that objects
698 # holding the class can access instance methods (like assertEqual)
699 type(self).test_instance = self
702 def pg_enable_capture(cls, interfaces=None):
704 Enable capture on packet-generator interfaces
706 :param interfaces: iterable interface indexes (if None,
707 use self.pg_interfaces)
710 if interfaces is None:
711 interfaces = cls.pg_interfaces
716 def register_capture(cls, cap_name):
717 """ Register a capture in the testclass """
718 # add to the list of captures with current timestamp
719 cls._captures.append((time.time(), cap_name))
720 # filter out from zombies
721 cls._zombie_captures = [(stamp, name)
722 for (stamp, name) in cls._zombie_captures
727 """ Remove any zombie captures and enable the packet generator """
728 # how long before capture is allowed to be deleted - otherwise vpp
729 # crashes - 100ms seems enough (this shouldn't be needed at all)
732 for stamp, cap_name in cls._zombie_captures:
733 wait = stamp + capture_ttl - now
735 cls.sleep(wait, "before deleting capture %s" % cap_name)
737 cls.logger.debug("Removing zombie capture %s" % cap_name)
738 cls.vapi.cli('packet-generator delete %s' % cap_name)
740 cls.vapi.cli("trace add pg-input 1000")
741 cls.vapi.cli('packet-generator enable')
742 cls._zombie_captures = cls._captures
746 def create_pg_interfaces(cls, interfaces):
748 Create packet-generator interfaces.
750 :param interfaces: iterable indexes of the interfaces.
751 :returns: List of created interfaces.
756 intf = VppPGInterface(cls, i)
757 setattr(cls, intf.name, intf)
759 cls.pg_interfaces = result
763 def create_loopback_interfaces(cls, count):
765 Create loopback interfaces.
767 :param count: number of interfaces created.
768 :returns: List of created interfaces.
770 result = [VppLoInterface(cls) for i in range(count)]
772 setattr(cls, intf.name, intf)
773 cls.lo_interfaces = result
777 def create_bvi_interfaces(cls, count):
779 Create BVI interfaces.
781 :param count: number of interfaces created.
782 :returns: List of created interfaces.
784 result = [VppBviInterface(cls) for i in range(count)]
786 setattr(cls, intf.name, intf)
787 cls.bvi_interfaces = result
791 def extend_packet(packet, size, padding=' '):
793 Extend packet to given size by padding with spaces or custom padding
794 NOTE: Currently works only when Raw layer is present.
796 :param packet: packet
797 :param size: target size
798 :param padding: padding used to extend the payload
801 packet_len = len(packet) + 4
802 extend = size - packet_len
804 num = (extend // len(padding)) + 1
805 packet[Raw].load += (padding * num)[:extend].encode("ascii")
808 def reset_packet_infos(cls):
809 """ Reset the list of packet info objects and packet counts to zero """
810 cls._packet_infos = {}
811 cls._packet_count_for_dst_if_idx = {}
814 def create_packet_info(cls, src_if, dst_if):
816 Create packet info object containing the source and destination indexes
817 and add it to the testcase's packet info list
819 :param VppInterface src_if: source interface
820 :param VppInterface dst_if: destination interface
822 :returns: _PacketInfo object
826 info.index = len(cls._packet_infos)
827 info.src = src_if.sw_if_index
828 info.dst = dst_if.sw_if_index
829 if isinstance(dst_if, VppSubInterface):
830 dst_idx = dst_if.parent.sw_if_index
832 dst_idx = dst_if.sw_if_index
833 if dst_idx in cls._packet_count_for_dst_if_idx:
834 cls._packet_count_for_dst_if_idx[dst_idx] += 1
836 cls._packet_count_for_dst_if_idx[dst_idx] = 1
837 cls._packet_infos[info.index] = info
841 def info_to_payload(info):
843 Convert _PacketInfo object to packet payload
845 :param info: _PacketInfo object
847 :returns: string containing serialized data from packet info
849 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
853 def payload_to_info(payload, payload_field='load'):
855 Convert packet payload to _PacketInfo object
857 :param payload: packet payload
858 :type payload: <class 'scapy.packet.Raw'>
859 :param payload_field: packet fieldname of payload "load" for
860 <class 'scapy.packet.Raw'>
861 :type payload_field: str
862 :returns: _PacketInfo object containing de-serialized data from payload
865 numbers = getattr(payload, payload_field).split()
867 info.index = int(numbers[0])
868 info.src = int(numbers[1])
869 info.dst = int(numbers[2])
870 info.ip = int(numbers[3])
871 info.proto = int(numbers[4])
874 def get_next_packet_info(self, info):
876 Iterate over the packet info list stored in the testcase
877 Start iteration with first element if info is None
878 Continue based on index in info if info is specified
880 :param info: info or None
881 :returns: next info in list or None if no more infos
886 next_index = info.index + 1
887 if next_index == len(self._packet_infos):
890 return self._packet_infos[next_index]
892 def get_next_packet_info_for_interface(self, src_index, info):
894 Search the packet info list for the next packet info with same source
897 :param src_index: source interface index to search for
898 :param info: packet info - where to start the search
899 :returns: packet info or None
903 info = self.get_next_packet_info(info)
906 if info.src == src_index:
909 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
911 Search the packet info list for the next packet info with same source
912 and destination interface indexes
914 :param src_index: source interface index to search for
915 :param dst_index: destination interface index to search for
916 :param info: packet info - where to start the search
917 :returns: packet info or None
921 info = self.get_next_packet_info_for_interface(src_index, info)
924 if info.dst == dst_index:
927 def assert_equal(self, real_value, expected_value, name_or_class=None):
928 if name_or_class is None:
929 self.assertEqual(real_value, expected_value)
932 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
933 msg = msg % (getdoc(name_or_class).strip(),
934 real_value, str(name_or_class(real_value)),
935 expected_value, str(name_or_class(expected_value)))
937 msg = "Invalid %s: %s does not match expected value %s" % (
938 name_or_class, real_value, expected_value)
940 self.assertEqual(real_value, expected_value, msg)
942 def assert_in_range(self,
950 msg = "Invalid %s: %s out of range <%s,%s>" % (
951 name, real_value, expected_min, expected_max)
952 self.assertTrue(expected_min <= real_value <= expected_max, msg)
954 def assert_packet_checksums_valid(self, packet,
955 ignore_zero_udp_checksums=True):
956 received = packet.__class__(scapy.compat.raw(packet))
958 ppp("Verifying packet checksums for packet:", received))
959 udp_layers = ['UDP', 'UDPerror']
960 checksum_fields = ['cksum', 'chksum']
963 temp = received.__class__(scapy.compat.raw(received))
965 layer = temp.getlayer(counter)
967 for cf in checksum_fields:
968 if hasattr(layer, cf):
969 if ignore_zero_udp_checksums and \
970 0 == getattr(layer, cf) and \
971 layer.name in udp_layers:
974 checksums.append((counter, cf))
977 counter = counter + 1
978 if 0 == len(checksums):
980 temp = temp.__class__(scapy.compat.raw(temp))
981 for layer, cf in checksums:
982 calc_sum = getattr(temp[layer], cf)
984 getattr(received[layer], cf), calc_sum,
985 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
987 "Checksum field `%s` on `%s` layer has correct value `%s`" %
988 (cf, temp[layer].name, calc_sum))
990 def assert_checksum_valid(self, received_packet, layer,
992 ignore_zero_checksum=False):
993 """ Check checksum of received packet on given layer """
994 received_packet_checksum = getattr(received_packet[layer], field_name)
995 if ignore_zero_checksum and 0 == received_packet_checksum:
997 recalculated = received_packet.__class__(
998 scapy.compat.raw(received_packet))
999 delattr(recalculated[layer], field_name)
1000 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1001 self.assert_equal(received_packet_checksum,
1002 getattr(recalculated[layer], field_name),
1003 "packet checksum on layer: %s" % layer)
1005 def assert_ip_checksum_valid(self, received_packet,
1006 ignore_zero_checksum=False):
1007 self.assert_checksum_valid(received_packet, 'IP',
1008 ignore_zero_checksum=ignore_zero_checksum)
1010 def assert_tcp_checksum_valid(self, received_packet,
1011 ignore_zero_checksum=False):
1012 self.assert_checksum_valid(received_packet, 'TCP',
1013 ignore_zero_checksum=ignore_zero_checksum)
1015 def assert_udp_checksum_valid(self, received_packet,
1016 ignore_zero_checksum=True):
1017 self.assert_checksum_valid(received_packet, 'UDP',
1018 ignore_zero_checksum=ignore_zero_checksum)
1020 def assert_embedded_icmp_checksum_valid(self, received_packet):
1021 if received_packet.haslayer(IPerror):
1022 self.assert_checksum_valid(received_packet, 'IPerror')
1023 if received_packet.haslayer(TCPerror):
1024 self.assert_checksum_valid(received_packet, 'TCPerror')
1025 if received_packet.haslayer(UDPerror):
1026 self.assert_checksum_valid(received_packet, 'UDPerror',
1027 ignore_zero_checksum=True)
1028 if received_packet.haslayer(ICMPerror):
1029 self.assert_checksum_valid(received_packet, 'ICMPerror')
1031 def assert_icmp_checksum_valid(self, received_packet):
1032 self.assert_checksum_valid(received_packet, 'ICMP')
1033 self.assert_embedded_icmp_checksum_valid(received_packet)
1035 def assert_icmpv6_checksum_valid(self, pkt):
1036 if pkt.haslayer(ICMPv6DestUnreach):
1037 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
1038 self.assert_embedded_icmp_checksum_valid(pkt)
1039 if pkt.haslayer(ICMPv6EchoRequest):
1040 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
1041 if pkt.haslayer(ICMPv6EchoReply):
1042 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
1044 def get_packet_counter(self, counter):
1045 if counter.startswith("/"):
1046 counter_value = self.statistics.get_counter(counter)
1048 counters = self.vapi.cli("sh errors").split('\n')
1050 for i in range(1, len(counters) - 1):
1051 results = counters[i].split()
1052 if results[1] == counter:
1053 counter_value = int(results[0])
1055 return counter_value
1057 def assert_packet_counter_equal(self, counter, expected_value):
1058 counter_value = self.get_packet_counter(counter)
1059 self.assert_equal(counter_value, expected_value,
1060 "packet counter `%s'" % counter)
1062 def assert_error_counter_equal(self, counter, expected_value):
1063 counter_value = self.statistics.get_err_counter(counter)
1064 self.assert_equal(counter_value, expected_value,
1065 "error counter `%s'" % counter)
1068 def sleep(cls, timeout, remark=None):
1070 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1071 # * by Guido, only the main thread can be interrupted.
1073 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1076 if hasattr(os, 'sched_yield'):
1082 if hasattr(cls, 'logger'):
1083 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1084 before = time.time()
1087 if hasattr(cls, 'logger') and after - before > 2 * timeout:
1088 cls.logger.error("unexpected self.sleep() result - "
1089 "slept for %es instead of ~%es!",
1090 after - before, timeout)
1091 if hasattr(cls, 'logger'):
1093 "Finished sleep (%s) - slept %es (wanted %es)",
1094 remark, after - before, timeout)
1096 def pg_send(self, intf, pkts):
1097 self.vapi.cli("clear trace")
1098 intf.add_stream(pkts)
1099 self.pg_enable_capture(self.pg_interfaces)
1102 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
1103 self.pg_send(intf, pkts)
1106 for i in self.pg_interfaces:
1107 i.get_capture(0, timeout=timeout)
1108 i.assert_nothing_captured(remark=remark)
1111 def send_and_expect(self, intf, pkts, output, n_rx=None):
1114 self.pg_send(intf, pkts)
1115 rx = output.get_capture(n_rx)
1118 def send_and_expect_only(self, intf, pkts, output, timeout=None):
1119 self.pg_send(intf, pkts)
1120 rx = output.get_capture(len(pkts))
1124 for i in self.pg_interfaces:
1125 if i not in outputs:
1126 i.get_capture(0, timeout=timeout)
1127 i.assert_nothing_captured()
1133 """ unittest calls runTest when TestCase is instantiated without a
1134 test case. Use case: Writing unittests against VppTestCase"""
1138 def get_testcase_doc_name(test):
1139 return getdoc(test.__class__).splitlines()[0]
1142 def get_test_description(descriptions, test):
1143 short_description = test.shortDescription()
1144 if descriptions and short_description:
1145 return short_description
1150 class TestCaseInfo(object):
1151 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1152 self.logger = logger
1153 self.tempdir = tempdir
1154 self.vpp_pid = vpp_pid
1155 self.vpp_bin_path = vpp_bin_path
1156 self.core_crash_test = None
1159 class VppTestResult(unittest.TestResult):
1161 @property result_string
1162 String variable to store the test case result string.
1164 List variable containing 2-tuples of TestCase instances and strings
1165 holding formatted tracebacks. Each tuple represents a test which
1166 raised an unexpected exception.
1168 List variable containing 2-tuples of TestCase instances and strings
1169 holding formatted tracebacks. Each tuple represents a test where
1170 a failure was explicitly signalled using the TestCase.assert*()
1174 failed_test_cases_info = set()
1175 core_crash_test_cases_info = set()
1176 current_test_case_info = None
1178 def __init__(self, stream=None, descriptions=None, verbosity=None,
1181 :param stream File descriptor to store where to report test results.
1182 Set to the standard error stream by default.
1183 :param descriptions Boolean variable to store information if to use
1184 test case descriptions.
1185 :param verbosity Integer variable to store required verbosity level.
1187 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1188 self.stream = stream
1189 self.descriptions = descriptions
1190 self.verbosity = verbosity
1191 self.result_string = None
1192 self.runner = runner
1194 def addSuccess(self, test):
1196 Record a test succeeded result
1201 if self.current_test_case_info:
1202 self.current_test_case_info.logger.debug(
1203 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1204 test._testMethodName,
1205 test._testMethodDoc))
1206 unittest.TestResult.addSuccess(self, test)
1207 self.result_string = colorize("OK", GREEN)
1209 self.send_result_through_pipe(test, PASS)
1211 def addSkip(self, test, reason):
1213 Record a test skipped.
1219 if self.current_test_case_info:
1220 self.current_test_case_info.logger.debug(
1221 "--- addSkip() %s.%s(%s) called, reason is %s" %
1222 (test.__class__.__name__, test._testMethodName,
1223 test._testMethodDoc, reason))
1224 unittest.TestResult.addSkip(self, test, reason)
1225 self.result_string = colorize("SKIP", YELLOW)
1227 self.send_result_through_pipe(test, SKIP)
1229 def symlink_failed(self):
1230 if self.current_test_case_info:
1232 failed_dir = os.getenv('FAILED_DIR')
1233 link_path = os.path.join(
1236 os.path.basename(self.current_test_case_info.tempdir))
1237 if self.current_test_case_info.logger:
1238 self.current_test_case_info.logger.debug(
1239 "creating a link to the failed test")
1240 self.current_test_case_info.logger.debug(
1241 "os.symlink(%s, %s)" %
1242 (self.current_test_case_info.tempdir, link_path))
1243 if os.path.exists(link_path):
1244 if self.current_test_case_info.logger:
1245 self.current_test_case_info.logger.debug(
1246 'symlink already exists')
1248 os.symlink(self.current_test_case_info.tempdir, link_path)
1250 except Exception as e:
1251 if self.current_test_case_info.logger:
1252 self.current_test_case_info.logger.error(e)
1254 def send_result_through_pipe(self, test, result):
1255 if hasattr(self, 'test_framework_result_pipe'):
1256 pipe = self.test_framework_result_pipe
1258 pipe.send((test.id(), result))
1260 def log_error(self, test, err, fn_name):
1261 if self.current_test_case_info:
1262 if isinstance(test, unittest.suite._ErrorHolder):
1263 test_name = test.description
1265 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1266 test._testMethodName,
1267 test._testMethodDoc)
1268 self.current_test_case_info.logger.debug(
1269 "--- %s() %s called, err is %s" %
1270 (fn_name, test_name, err))
1271 self.current_test_case_info.logger.debug(
1272 "formatted exception is:\n%s" %
1273 "".join(format_exception(*err)))
1275 def add_error(self, test, err, unittest_fn, error_type):
1276 if error_type == FAIL:
1277 self.log_error(test, err, 'addFailure')
1278 error_type_str = colorize("FAIL", RED)
1279 elif error_type == ERROR:
1280 self.log_error(test, err, 'addError')
1281 error_type_str = colorize("ERROR", RED)
1283 raise Exception('Error type %s cannot be used to record an '
1284 'error or a failure' % error_type)
1286 unittest_fn(self, test, err)
1287 if self.current_test_case_info:
1288 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1290 self.current_test_case_info.tempdir)
1291 self.symlink_failed()
1292 self.failed_test_cases_info.add(self.current_test_case_info)
1293 if is_core_present(self.current_test_case_info.tempdir):
1294 if not self.current_test_case_info.core_crash_test:
1295 if isinstance(test, unittest.suite._ErrorHolder):
1296 test_name = str(test)
1298 test_name = "'{!s}' ({!s})".format(
1299 get_testcase_doc_name(test), test.id())
1300 self.current_test_case_info.core_crash_test = test_name
1301 self.core_crash_test_cases_info.add(
1302 self.current_test_case_info)
1304 self.result_string = '%s [no temp dir]' % error_type_str
1306 self.send_result_through_pipe(test, error_type)
1308 def addFailure(self, test, err):
1310 Record a test failed result
1313 :param err: error message
1316 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1318 def addError(self, test, err):
1320 Record a test error result
1323 :param err: error message
1326 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1328 def getDescription(self, test):
1330 Get test description
1333 :returns: test description
1336 return get_test_description(self.descriptions, test)
1338 def startTest(self, test):
1346 def print_header(test):
1347 if not hasattr(test.__class__, '_header_printed'):
1348 print(double_line_delim)
1349 print(colorize(getdoc(test).splitlines()[0], GREEN))
1350 print(double_line_delim)
1351 test.__class__._header_printed = True
1355 unittest.TestResult.startTest(self, test)
1356 if self.verbosity > 0:
1357 self.stream.writeln(
1358 "Starting " + self.getDescription(test) + " ...")
1359 self.stream.writeln(single_line_delim)
1361 def stopTest(self, test):
1363 Called when the given test has been run
1368 unittest.TestResult.stopTest(self, test)
1369 if self.verbosity > 0:
1370 self.stream.writeln(single_line_delim)
1371 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1372 self.result_string))
1373 self.stream.writeln(single_line_delim)
1375 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1376 self.result_string))
1378 self.send_result_through_pipe(test, TEST_RUN)
1380 def printErrors(self):
1382 Print errors from running the test case
1384 if len(self.errors) > 0 or len(self.failures) > 0:
1385 self.stream.writeln()
1386 self.printErrorList('ERROR', self.errors)
1387 self.printErrorList('FAIL', self.failures)
1389 # ^^ that is the last output from unittest before summary
1390 if not self.runner.print_summary:
1391 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1392 self.stream = devnull
1393 self.runner.stream = devnull
1395 def printErrorList(self, flavour, errors):
1397 Print error list to the output stream together with error type
1398 and test case description.
1400 :param flavour: error type
1401 :param errors: iterable errors
1404 for test, err in errors:
1405 self.stream.writeln(double_line_delim)
1406 self.stream.writeln("%s: %s" %
1407 (flavour, self.getDescription(test)))
1408 self.stream.writeln(single_line_delim)
1409 self.stream.writeln("%s" % err)
1412 class VppTestRunner(unittest.TextTestRunner):
1414 A basic test runner implementation which prints results to standard error.
1418 def resultclass(self):
1419 """Class maintaining the results of the tests"""
1420 return VppTestResult
1422 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1423 result_pipe=None, failfast=False, buffer=False,
1424 resultclass=None, print_summary=True, **kwargs):
1425 # ignore stream setting here, use hard-coded stdout to be in sync
1426 # with prints from VppTestCase methods ...
1427 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1428 verbosity, failfast, buffer,
1429 resultclass, **kwargs)
1430 KeepAliveReporter.pipe = keep_alive_pipe
1432 self.orig_stream = self.stream
1433 self.resultclass.test_framework_result_pipe = result_pipe
1435 self.print_summary = print_summary
1437 def _makeResult(self):
1438 return self.resultclass(self.stream,
1443 def run(self, test):
1450 faulthandler.enable() # emit stack trace to stderr if killed by signal
1452 result = super(VppTestRunner, self).run(test)
1453 if not self.print_summary:
1454 self.stream = self.orig_stream
1455 result.stream = self.orig_stream
1459 class Worker(Thread):
1460 def __init__(self, args, logger, env=None):
1461 self.logger = logger
1465 env = {} if env is None else env
1466 self.env = copy.deepcopy(env)
1467 super(Worker, self).__init__()
1470 executable = self.args[0]
1471 if not os.path.exists(executable) or not os.access(
1472 executable, os.F_OK | os.X_OK):
1473 # Exit code that means some system file did not exist,
1474 # could not be opened, or had some other kind of error.
1475 self.result = os.EX_OSFILE
1476 raise EnvironmentError(
1477 "executable '%s' is not found or executable." % executable)
1478 self.logger.debug("Running executable w/args `%s'" % self.args)
1479 env = os.environ.copy()
1480 env.update(self.env)
1481 env["CK_LOG_FILE_NAME"] = "-"
1482 self.process = subprocess.Popen(
1483 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1484 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1485 out, err = self.process.communicate()
1486 self.logger.debug("Finished running `%s'" % executable)
1487 self.logger.info("Return code is `%s'" % self.process.returncode)
1488 self.logger.info(single_line_delim)
1489 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1490 self.logger.info(single_line_delim)
1491 self.logger.info(out)
1492 self.logger.info(single_line_delim)
1493 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1494 self.logger.info(single_line_delim)
1495 self.logger.info(err)
1496 self.logger.info(single_line_delim)
1497 self.result = self.process.returncode
1500 if __name__ == '__main__':