3 from __future__ import print_function
17 from collections import deque
18 from threading import Thread, Event
19 from inspect import getdoc, isclass
20 from traceback import format_exception
21 from logging import FileHandler, DEBUG, Formatter
24 from scapy.packet import Raw
25 import hook as hookmodule
26 from vpp_pg_interface import VppPGInterface
27 from vpp_sub_interface import VppSubInterface
28 from vpp_lo_interface import VppLoInterface
29 from vpp_bvi_interface import VppBviInterface
30 from vpp_papi_provider import VppPapiProvider
31 from vpp_papi.vpp_stats import VPPStats
32 from vpp_papi.vpp_transport_shmem import VppTransportShmemIOError
33 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
35 from vpp_object import VppObjectRegistry
36 from util import ppp, is_core_present
37 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
38 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
39 from scapy.layers.inet6 import ICMPv6EchoReply
41 if os.name == 'posix' and sys.version_info[0] < 3:
42 # using subprocess32 is recommended by python official documentation
43 # @ https://docs.python.org/2/library/subprocess.html
44 import subprocess32 as subprocess
48 # Python2/3 compatible
61 class BoolEnvironmentVariable(object):
63 def __init__(self, env_var_name, default='n', true_values=None):
64 self.name = env_var_name
65 self.default = default
66 self.true_values = true_values if true_values is not None else \
70 return os.getenv(self.name, self.default).lower() in self.true_values
72 if sys.version_info[0] == 2:
73 __nonzero__ = __bool__
76 return 'BoolEnvironmentVariable(%r, default=%r, true_values=%r)' % \
77 (self.name, self.default, self.true_values)
80 debug_framework = BoolEnvironmentVariable('TEST_DEBUG')
85 Test framework module.
87 The module provides a set of tools for constructing and running tests and
88 representing the results.
92 class VppDiedError(Exception):
93 """ exception for reporting that the subprocess has died."""
95 signals_by_value = {v: k for k, v in signal.__dict__.items() if
96 k.startswith('SIG') and not k.startswith('SIG_')}
98 def __init__(self, rv=None, testcase=None, method_name=None):
100 self.signal_name = None
101 self.testcase = testcase
102 self.method_name = method_name
105 self.signal_name = VppDiedError.signals_by_value[-rv]
106 except (KeyError, TypeError):
109 if testcase is None and method_name is None:
112 in_msg = 'running %s.%s ' % (testcase, method_name)
114 msg = "VPP subprocess died %sunexpectedly with return code: %d%s." % (
117 ' [%s]' % (self.signal_name if
118 self.signal_name is not None else ''))
119 super(VppDiedError, self).__init__(msg)
122 class _PacketInfo(object):
123 """Private class to create packet info object.
125 Help process information about the next packet.
126 Set variables to default values.
128 #: Store the index of the packet.
130 #: Store the index of the source packet generator interface of the packet.
132 #: Store the index of the destination packet generator interface
135 #: Store expected ip version
137 #: Store expected upper protocol
139 #: Store the copy of the former packet.
142 def __eq__(self, other):
143 index = self.index == other.index
144 src = self.src == other.src
145 dst = self.dst == other.dst
146 data = self.data == other.data
147 return index and src and dst and data
150 def pump_output(testclass):
151 """ pump output from vpp stdout/stderr to proper queues """
154 while not testclass.pump_thread_stop_flag.is_set():
155 readable = select.select([testclass.vpp.stdout.fileno(),
156 testclass.vpp.stderr.fileno(),
157 testclass.pump_thread_wakeup_pipe[0]],
159 if testclass.vpp.stdout.fileno() in readable:
160 read = os.read(testclass.vpp.stdout.fileno(), 102400)
162 split = read.decode('ascii',
163 errors='backslashreplace').splitlines(True)
164 if len(stdout_fragment) > 0:
165 split[0] = "%s%s" % (stdout_fragment, split[0])
166 if len(split) > 0 and split[-1].endswith("\n"):
170 stdout_fragment = split[-1]
171 testclass.vpp_stdout_deque.extend(split[:limit])
172 if not testclass.cache_vpp_output:
173 for line in split[:limit]:
174 testclass.logger.debug(
175 "VPP STDOUT: %s" % line.rstrip("\n"))
176 if testclass.vpp.stderr.fileno() in readable:
177 read = os.read(testclass.vpp.stderr.fileno(), 102400)
179 split = read.decode('ascii',
180 errors='backslashreplace').splitlines(True)
181 if len(stderr_fragment) > 0:
182 split[0] = "%s%s" % (stderr_fragment, split[0])
183 if len(split) > 0 and split[-1].endswith("\n"):
187 stderr_fragment = split[-1]
189 testclass.vpp_stderr_deque.extend(split[:limit])
190 if not testclass.cache_vpp_output:
191 for line in split[:limit]:
192 testclass.logger.debug(
193 "VPP STDERR: %s" % line.rstrip("\n"))
194 # ignoring the dummy pipe here intentionally - the
195 # flag will take care of properly terminating the loop
198 def _is_skip_aarch64_set():
199 return BoolEnvironmentVariable('SKIP_AARCH64')
202 is_skip_aarch64_set = _is_skip_aarch64_set()
205 def _is_platform_aarch64():
206 return platform.machine() == 'aarch64'
209 is_platform_aarch64 = _is_platform_aarch64()
212 def _running_extended_tests():
213 return BoolEnvironmentVariable("EXTENDED_TESTS")
216 running_extended_tests = _running_extended_tests()
219 def _running_on_centos():
220 os_id = os.getenv("OS_ID", "")
221 return True if "centos" in os_id.lower() else False
224 running_on_centos = _running_on_centos()
227 class KeepAliveReporter(object):
229 Singleton object which reports test start to parent process
234 self.__dict__ = self._shared_state
242 def pipe(self, pipe):
243 if self._pipe is not None:
244 raise Exception("Internal error - pipe should only be set once.")
247 def send_keep_alive(self, test, desc=None):
249 Write current test tmpdir & desc to keep-alive pipe to signal liveness
251 if self.pipe is None:
252 # if not running forked..
256 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
260 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
263 class VppTestCase(unittest.TestCase):
264 """This subclass is a base class for VPP test cases that are implemented as
265 classes. It provides methods to create and run test case.
268 extra_vpp_punt_config = []
269 extra_vpp_plugin_config = []
272 def packet_infos(self):
273 """List of packet infos"""
274 return self._packet_infos
277 def get_packet_count_for_if_idx(cls, dst_if_index):
278 """Get the number of packet info for specified destination if index"""
279 if dst_if_index in cls._packet_count_for_dst_if_idx:
280 return cls._packet_count_for_dst_if_idx[dst_if_index]
286 """Return the instance of this testcase"""
287 return cls.test_instance
290 def set_debug_flags(cls, d):
291 cls.debug_core = False
292 cls.debug_gdb = False
293 cls.debug_gdbserver = False
298 cls.debug_core = True
301 elif dl == "gdbserver":
302 cls.debug_gdbserver = True
304 raise Exception("Unrecognized DEBUG option: '%s'" % d)
307 def get_least_used_cpu():
308 cpu_usage_list = [set(range(psutil.cpu_count()))]
309 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
310 if 'vpp_main' == p.info['name']]
311 for vpp_process in vpp_processes:
312 for cpu_usage_set in cpu_usage_list:
314 cpu_num = vpp_process.cpu_num()
315 if cpu_num in cpu_usage_set:
316 cpu_usage_set_index = cpu_usage_list.index(
318 if cpu_usage_set_index == len(cpu_usage_list) - 1:
319 cpu_usage_list.append({cpu_num})
321 cpu_usage_list[cpu_usage_set_index + 1].add(
323 cpu_usage_set.remove(cpu_num)
325 except psutil.NoSuchProcess:
328 for cpu_usage_set in cpu_usage_list:
329 if len(cpu_usage_set) > 0:
330 min_usage_set = cpu_usage_set
333 return random.choice(tuple(min_usage_set))
336 def setUpConstants(cls):
337 """ Set-up the test case class based on environment variables """
338 cls.step = BoolEnvironmentVariable('STEP')
339 d = os.getenv("DEBUG", None)
340 # inverted case to handle '' == True
341 c = os.getenv("CACHE_OUTPUT", "1")
342 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
343 cls.set_debug_flags(d)
344 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
345 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
346 cls.test_plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
347 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
349 if cls.plugin_path is not None:
350 if cls.extern_plugin_path is not None:
351 plugin_path = "%s:%s" % (
352 cls.plugin_path, cls.extern_plugin_path)
354 plugin_path = cls.plugin_path
355 elif cls.extern_plugin_path is not None:
356 plugin_path = cls.extern_plugin_path
358 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
359 debug_cli = "cli-listen localhost:5002"
361 size = os.getenv("COREDUMP_SIZE")
363 coredump_size = "coredump-size %s" % size
364 if coredump_size is None:
365 coredump_size = "coredump-size unlimited"
367 cpu_core_number = cls.get_least_used_cpu()
368 if not hasattr(cls, "worker_config"):
369 cls.worker_config = ""
371 cls.vpp_cmdline = [cls.vpp_bin, "unix",
372 "{", "nodaemon", debug_cli, "full-coredump",
373 coredump_size, "runtime-dir", cls.tempdir, "}",
374 "api-trace", "{", "on", "}", "api-segment", "{",
375 "prefix", cls.shm_prefix, "}", "cpu", "{",
376 "main-core", str(cpu_core_number),
377 cls.worker_config, "}",
378 "statseg", "{", "socket-name", cls.stats_sock, "}",
379 "socksvr", "{", "socket-name", cls.api_sock, "}",
381 "{", "plugin", "dpdk_plugin.so", "{", "disable",
382 "}", "plugin", "rdma_plugin.so", "{", "disable",
383 "}", "plugin", "unittest_plugin.so", "{", "enable",
384 "}"] + cls.extra_vpp_plugin_config + ["}", ]
385 if cls.extra_vpp_punt_config is not None:
386 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
387 if plugin_path is not None:
388 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
389 if cls.test_plugin_path is not None:
390 cls.vpp_cmdline.extend(["test_plugin_path", cls.test_plugin_path])
392 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
393 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
396 def wait_for_enter(cls):
397 if cls.debug_gdbserver:
398 print(double_line_delim)
399 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
401 print(double_line_delim)
402 print("Spawned VPP with PID: %d" % cls.vpp.pid)
404 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
406 print(single_line_delim)
407 print("You can debug the VPP using e.g.:")
408 if cls.debug_gdbserver:
409 print("sudo gdb " + cls.vpp_bin +
410 " -ex 'target remote localhost:7777'")
411 print("Now is the time to attach a gdb by running the above "
412 "command, set up breakpoints etc. and then resume VPP from "
413 "within gdb by issuing the 'continue' command")
415 print("sudo gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
416 print("Now is the time to attach a gdb by running the above "
417 "command and set up breakpoints etc.")
418 print(single_line_delim)
419 input("Press ENTER to continue running the testcase...")
423 cmdline = cls.vpp_cmdline
425 if cls.debug_gdbserver:
426 gdbserver = '/usr/bin/gdbserver'
427 if not os.path.isfile(gdbserver) or \
428 not os.access(gdbserver, os.X_OK):
429 raise Exception("gdbserver binary '%s' does not exist or is "
430 "not executable" % gdbserver)
432 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
433 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
436 cls.vpp = subprocess.Popen(cmdline,
437 stdout=subprocess.PIPE,
438 stderr=subprocess.PIPE,
440 except subprocess.CalledProcessError as e:
441 cls.logger.critical("Subprocess returned with non-0 return code: ("
445 cls.logger.critical("Subprocess returned with OS error: "
446 "(%s) %s", e.errno, e.strerror)
448 except Exception as e:
449 cls.logger.exception("Subprocess returned unexpected from "
456 def wait_for_stats_socket(cls):
457 deadline = time.time() + 300
459 while time.time() < deadline or \
460 cls.debug_gdb or cls.debug_gdbserver:
461 if os.path.exists(cls.stats_sock):
466 cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
469 def wait_for_coredump(cls):
470 corefile = cls.tempdir + "/core"
471 if os.path.isfile(corefile):
472 cls.logger.error("Waiting for coredump to complete: %s", corefile)
473 curr_size = os.path.getsize(corefile)
474 deadline = time.time() + 60
476 while time.time() < deadline:
479 curr_size = os.path.getsize(corefile)
480 if size == curr_size:
484 cls.logger.error("Timed out waiting for coredump to complete:"
487 cls.logger.error("Coredump complete: %s, size %d",
493 Perform class setup before running the testcase
494 Remove shared memory files, start vpp and connect the vpp-api
496 super(VppTestCase, cls).setUpClass()
497 gc.collect() # run garbage collection first
499 cls.logger = get_logger(cls.__name__)
500 if hasattr(cls, 'parallel_handler'):
501 cls.logger.addHandler(cls.parallel_handler)
502 cls.logger.propagate = False
504 cls.tempdir = tempfile.mkdtemp(
505 prefix='vpp-unittest-%s-' % cls.__name__)
506 cls.stats_sock = "%s/stats.sock" % cls.tempdir
507 cls.api_sock = "%s/api.sock" % cls.tempdir
508 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
509 cls.file_handler.setFormatter(
510 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
512 cls.file_handler.setLevel(DEBUG)
513 cls.logger.addHandler(cls.file_handler)
514 cls.logger.debug("--- setUpClass() for %s called ---" %
516 cls.shm_prefix = os.path.basename(cls.tempdir)
517 os.chdir(cls.tempdir)
518 cls.logger.info("Temporary dir is %s, shm prefix is %s",
519 cls.tempdir, cls.shm_prefix)
521 cls.reset_packet_infos()
525 cls.registry = VppObjectRegistry()
526 cls.vpp_startup_failed = False
527 cls.reporter = KeepAliveReporter()
528 # need to catch exceptions here because if we raise, then the cleanup
529 # doesn't get called and we might end with a zombie vpp
532 cls.reporter.send_keep_alive(cls, 'setUpClass')
533 VppTestResult.current_test_case_info = TestCaseInfo(
534 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
535 cls.vpp_stdout_deque = deque()
536 cls.vpp_stderr_deque = deque()
537 cls.pump_thread_stop_flag = Event()
538 cls.pump_thread_wakeup_pipe = os.pipe()
539 cls.pump_thread = Thread(target=pump_output, args=(cls,))
540 cls.pump_thread.daemon = True
541 cls.pump_thread.start()
542 if cls.debug_gdb or cls.debug_gdbserver:
546 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
549 hook = hookmodule.StepHook(cls)
551 hook = hookmodule.PollHook(cls)
552 cls.vapi.register_hook(hook)
553 cls.wait_for_stats_socket()
554 cls.statistics = VPPStats(socketname=cls.stats_sock)
558 cls.vpp_startup_failed = True
560 "VPP died shortly after startup, check the"
561 " output to standard error for possible cause")
567 cls.vapi.disconnect()
570 if cls.debug_gdbserver:
571 print(colorize("You're running VPP inside gdbserver but "
572 "VPP-API connection failed, did you forget "
573 "to 'continue' VPP from within gdb?", RED))
575 except Exception as e:
576 cls.logger.debug("Exception connecting to VPP: %s" % e)
584 Disconnect vpp-api, kill vpp and cleanup shared memory files
586 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
588 if cls.vpp.returncode is None:
589 print(double_line_delim)
590 print("VPP or GDB server is still running")
591 print(single_line_delim)
592 input("When done debugging, press ENTER to kill the "
593 "process and finish running the testcase...")
595 # first signal that we want to stop the pump thread, then wake it up
596 if hasattr(cls, 'pump_thread_stop_flag'):
597 cls.pump_thread_stop_flag.set()
598 if hasattr(cls, 'pump_thread_wakeup_pipe'):
599 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
600 if hasattr(cls, 'pump_thread'):
601 cls.logger.debug("Waiting for pump thread to stop")
602 cls.pump_thread.join()
603 if hasattr(cls, 'vpp_stderr_reader_thread'):
604 cls.logger.debug("Waiting for stdderr pump to stop")
605 cls.vpp_stderr_reader_thread.join()
607 if hasattr(cls, 'vpp'):
608 if hasattr(cls, 'vapi'):
609 cls.logger.debug("Disconnecting class vapi client on %s",
611 cls.vapi.disconnect()
612 cls.logger.debug("Deleting class vapi attribute on %s",
616 if cls.vpp.returncode is None:
617 cls.wait_for_coredump()
618 cls.logger.debug("Sending TERM to vpp")
620 cls.logger.debug("Waiting for vpp to die")
621 cls.vpp.communicate()
622 cls.logger.debug("Deleting class vpp attribute on %s",
626 if cls.vpp_startup_failed:
627 stdout_log = cls.logger.info
628 stderr_log = cls.logger.critical
630 stdout_log = cls.logger.info
631 stderr_log = cls.logger.info
633 if hasattr(cls, 'vpp_stdout_deque'):
634 stdout_log(single_line_delim)
635 stdout_log('VPP output to stdout while running %s:', cls.__name__)
636 stdout_log(single_line_delim)
637 vpp_output = "".join(cls.vpp_stdout_deque)
638 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
640 stdout_log('\n%s', vpp_output)
641 stdout_log(single_line_delim)
643 if hasattr(cls, 'vpp_stderr_deque'):
644 stderr_log(single_line_delim)
645 stderr_log('VPP output to stderr while running %s:', cls.__name__)
646 stderr_log(single_line_delim)
647 vpp_output = "".join(cls.vpp_stderr_deque)
648 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
650 stderr_log('\n%s', vpp_output)
651 stderr_log(single_line_delim)
654 def tearDownClass(cls):
655 """ Perform final cleanup after running all tests in this test-case """
656 cls.logger.debug("--- tearDownClass() for %s called ---" %
658 cls.reporter.send_keep_alive(cls, 'tearDownClass')
660 cls.file_handler.close()
661 cls.reset_packet_infos()
663 debug_internal.on_tear_down_class(cls)
665 def show_commands_at_teardown(self):
666 """ Allow subclass specific teardown logging additions."""
667 self.logger.info("--- No test specific show commands provided. ---")
670 """ Show various debug prints after each test """
671 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
672 (self.__class__.__name__, self._testMethodName,
673 self._testMethodDoc))
676 if not self.vpp_dead:
677 self.logger.debug(self.vapi.cli("show trace max 1000"))
678 self.logger.info(self.vapi.ppcli("show interface"))
679 self.logger.info(self.vapi.ppcli("show hardware"))
680 self.logger.info(self.statistics.set_errors_str())
681 self.logger.info(self.vapi.ppcli("show run"))
682 self.logger.info(self.vapi.ppcli("show log"))
683 self.logger.info(self.vapi.ppcli("show bihash"))
684 self.logger.info("Logging testcase specific show commands.")
685 self.show_commands_at_teardown()
686 self.registry.remove_vpp_config(self.logger)
687 # Save/Dump VPP api trace log
688 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
689 tmp_api_trace = "/tmp/%s" % api_trace
690 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
691 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
692 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
694 os.rename(tmp_api_trace, vpp_api_trace_log)
695 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
697 except VppTransportShmemIOError:
698 self.logger.debug("VppTransportShmemIOError: Vpp dead. "
699 "Cannot log show commands.")
702 self.registry.unregister_all(self.logger)
705 """ Clear trace before running each test"""
706 super(VppTestCase, self).setUp()
707 self.reporter.send_keep_alive(self)
710 raise VppDiedError(rv=None, testcase=self.__class__.__name__,
711 method_name=self._testMethodName)
712 self.sleep(.1, "during setUp")
713 self.vpp_stdout_deque.append(
714 "--- test setUp() for %s.%s(%s) starts here ---\n" %
715 (self.__class__.__name__, self._testMethodName,
716 self._testMethodDoc))
717 self.vpp_stderr_deque.append(
718 "--- test setUp() for %s.%s(%s) starts here ---\n" %
719 (self.__class__.__name__, self._testMethodName,
720 self._testMethodDoc))
721 self.vapi.cli("clear trace")
722 # store the test instance inside the test class - so that objects
723 # holding the class can access instance methods (like assertEqual)
724 type(self).test_instance = self
727 def pg_enable_capture(cls, interfaces=None):
729 Enable capture on packet-generator interfaces
731 :param interfaces: iterable interface indexes (if None,
732 use self.pg_interfaces)
735 if interfaces is None:
736 interfaces = cls.pg_interfaces
741 def register_capture(cls, cap_name):
742 """ Register a capture in the testclass """
743 # add to the list of captures with current timestamp
744 cls._captures.append((time.time(), cap_name))
748 """ Enable the PG, wait till it is done, then clean up """
749 cls.vapi.cli("trace add pg-input 1000")
750 cls.vapi.cli('packet-generator enable')
751 # PG, when starts, runs to completion -
752 # so let's avoid a race condition,
753 # and wait a little till it's done.
754 # Then clean it up - and then be gone.
755 deadline = time.time() + 300
756 while cls.vapi.cli('show packet-generator').find("Yes") != -1:
757 cls.sleep(0.01) # yield
758 if time.time() > deadline:
759 cls.logger.error("Timeout waiting for pg to stop")
761 for stamp, cap_name in cls._captures:
762 cls.vapi.cli('packet-generator delete %s' % cap_name)
766 def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
768 Create packet-generator interfaces.
770 :param interfaces: iterable indexes of the interfaces.
771 :returns: List of created interfaces.
776 intf = VppPGInterface(cls, i, gso, gso_size)
777 setattr(cls, intf.name, intf)
779 cls.pg_interfaces = result
783 def create_loopback_interfaces(cls, count):
785 Create loopback interfaces.
787 :param count: number of interfaces created.
788 :returns: List of created interfaces.
790 result = [VppLoInterface(cls) for i in range(count)]
792 setattr(cls, intf.name, intf)
793 cls.lo_interfaces = result
797 def create_bvi_interfaces(cls, count):
799 Create BVI interfaces.
801 :param count: number of interfaces created.
802 :returns: List of created interfaces.
804 result = [VppBviInterface(cls) for i in range(count)]
806 setattr(cls, intf.name, intf)
807 cls.bvi_interfaces = result
811 def extend_packet(packet, size, padding=' '):
813 Extend packet to given size by padding with spaces or custom padding
814 NOTE: Currently works only when Raw layer is present.
816 :param packet: packet
817 :param size: target size
818 :param padding: padding used to extend the payload
821 packet_len = len(packet) + 4
822 extend = size - packet_len
824 num = (extend // len(padding)) + 1
825 packet[Raw].load += (padding * num)[:extend].encode("ascii")
828 def reset_packet_infos(cls):
829 """ Reset the list of packet info objects and packet counts to zero """
830 cls._packet_infos = {}
831 cls._packet_count_for_dst_if_idx = {}
834 def create_packet_info(cls, src_if, dst_if):
836 Create packet info object containing the source and destination indexes
837 and add it to the testcase's packet info list
839 :param VppInterface src_if: source interface
840 :param VppInterface dst_if: destination interface
842 :returns: _PacketInfo object
846 info.index = len(cls._packet_infos)
847 info.src = src_if.sw_if_index
848 info.dst = dst_if.sw_if_index
849 if isinstance(dst_if, VppSubInterface):
850 dst_idx = dst_if.parent.sw_if_index
852 dst_idx = dst_if.sw_if_index
853 if dst_idx in cls._packet_count_for_dst_if_idx:
854 cls._packet_count_for_dst_if_idx[dst_idx] += 1
856 cls._packet_count_for_dst_if_idx[dst_idx] = 1
857 cls._packet_infos[info.index] = info
861 def info_to_payload(info):
863 Convert _PacketInfo object to packet payload
865 :param info: _PacketInfo object
867 :returns: string containing serialized data from packet info
869 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
873 def payload_to_info(payload, payload_field='load'):
875 Convert packet payload to _PacketInfo object
877 :param payload: packet payload
878 :type payload: <class 'scapy.packet.Raw'>
879 :param payload_field: packet fieldname of payload "load" for
880 <class 'scapy.packet.Raw'>
881 :type payload_field: str
882 :returns: _PacketInfo object containing de-serialized data from payload
885 numbers = getattr(payload, payload_field).split()
887 info.index = int(numbers[0])
888 info.src = int(numbers[1])
889 info.dst = int(numbers[2])
890 info.ip = int(numbers[3])
891 info.proto = int(numbers[4])
894 def get_next_packet_info(self, info):
896 Iterate over the packet info list stored in the testcase
897 Start iteration with first element if info is None
898 Continue based on index in info if info is specified
900 :param info: info or None
901 :returns: next info in list or None if no more infos
906 next_index = info.index + 1
907 if next_index == len(self._packet_infos):
910 return self._packet_infos[next_index]
912 def get_next_packet_info_for_interface(self, src_index, info):
914 Search the packet info list for the next packet info with same source
917 :param src_index: source interface index to search for
918 :param info: packet info - where to start the search
919 :returns: packet info or None
923 info = self.get_next_packet_info(info)
926 if info.src == src_index:
929 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
931 Search the packet info list for the next packet info with same source
932 and destination interface indexes
934 :param src_index: source interface index to search for
935 :param dst_index: destination interface index to search for
936 :param info: packet info - where to start the search
937 :returns: packet info or None
941 info = self.get_next_packet_info_for_interface(src_index, info)
944 if info.dst == dst_index:
947 def assert_equal(self, real_value, expected_value, name_or_class=None):
948 if name_or_class is None:
949 self.assertEqual(real_value, expected_value)
952 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
953 msg = msg % (getdoc(name_or_class).strip(),
954 real_value, str(name_or_class(real_value)),
955 expected_value, str(name_or_class(expected_value)))
957 msg = "Invalid %s: %s does not match expected value %s" % (
958 name_or_class, real_value, expected_value)
960 self.assertEqual(real_value, expected_value, msg)
962 def assert_in_range(self,
970 msg = "Invalid %s: %s out of range <%s,%s>" % (
971 name, real_value, expected_min, expected_max)
972 self.assertTrue(expected_min <= real_value <= expected_max, msg)
974 def assert_packet_checksums_valid(self, packet,
975 ignore_zero_udp_checksums=True):
976 received = packet.__class__(scapy.compat.raw(packet))
977 udp_layers = ['UDP', 'UDPerror']
978 checksum_fields = ['cksum', 'chksum']
981 temp = received.__class__(scapy.compat.raw(received))
983 layer = temp.getlayer(counter)
985 for cf in checksum_fields:
986 if hasattr(layer, cf):
987 if ignore_zero_udp_checksums and \
988 0 == getattr(layer, cf) and \
989 layer.name in udp_layers:
992 checksums.append((counter, cf))
995 counter = counter + 1
996 if 0 == len(checksums):
998 temp = temp.__class__(scapy.compat.raw(temp))
999 for layer, cf in checksums:
1000 calc_sum = getattr(temp[layer], cf)
1002 getattr(received[layer], cf), calc_sum,
1003 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
1005 "Checksum field `%s` on `%s` layer has correct value `%s`" %
1006 (cf, temp[layer].name, calc_sum))
1008 def assert_checksum_valid(self, received_packet, layer,
1009 field_name='chksum',
1010 ignore_zero_checksum=False):
1011 """ Check checksum of received packet on given layer """
1012 received_packet_checksum = getattr(received_packet[layer], field_name)
1013 if ignore_zero_checksum and 0 == received_packet_checksum:
1015 recalculated = received_packet.__class__(
1016 scapy.compat.raw(received_packet))
1017 delattr(recalculated[layer], field_name)
1018 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1019 self.assert_equal(received_packet_checksum,
1020 getattr(recalculated[layer], field_name),
1021 "packet checksum on layer: %s" % layer)
1023 def assert_ip_checksum_valid(self, received_packet,
1024 ignore_zero_checksum=False):
1025 self.assert_checksum_valid(received_packet, 'IP',
1026 ignore_zero_checksum=ignore_zero_checksum)
1028 def assert_tcp_checksum_valid(self, received_packet,
1029 ignore_zero_checksum=False):
1030 self.assert_checksum_valid(received_packet, 'TCP',
1031 ignore_zero_checksum=ignore_zero_checksum)
1033 def assert_udp_checksum_valid(self, received_packet,
1034 ignore_zero_checksum=True):
1035 self.assert_checksum_valid(received_packet, 'UDP',
1036 ignore_zero_checksum=ignore_zero_checksum)
1038 def assert_embedded_icmp_checksum_valid(self, received_packet):
1039 if received_packet.haslayer(IPerror):
1040 self.assert_checksum_valid(received_packet, 'IPerror')
1041 if received_packet.haslayer(TCPerror):
1042 self.assert_checksum_valid(received_packet, 'TCPerror')
1043 if received_packet.haslayer(UDPerror):
1044 self.assert_checksum_valid(received_packet, 'UDPerror',
1045 ignore_zero_checksum=True)
1046 if received_packet.haslayer(ICMPerror):
1047 self.assert_checksum_valid(received_packet, 'ICMPerror')
1049 def assert_icmp_checksum_valid(self, received_packet):
1050 self.assert_checksum_valid(received_packet, 'ICMP')
1051 self.assert_embedded_icmp_checksum_valid(received_packet)
1053 def assert_icmpv6_checksum_valid(self, pkt):
1054 if pkt.haslayer(ICMPv6DestUnreach):
1055 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
1056 self.assert_embedded_icmp_checksum_valid(pkt)
1057 if pkt.haslayer(ICMPv6EchoRequest):
1058 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
1059 if pkt.haslayer(ICMPv6EchoReply):
1060 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
1062 def get_packet_counter(self, counter):
1063 if counter.startswith("/"):
1064 counter_value = self.statistics.get_counter(counter)
1066 counters = self.vapi.cli("sh errors").split('\n')
1068 for i in range(1, len(counters) - 1):
1069 results = counters[i].split()
1070 if results[1] == counter:
1071 counter_value = int(results[0])
1073 return counter_value
1075 def assert_packet_counter_equal(self, counter, expected_value):
1076 counter_value = self.get_packet_counter(counter)
1077 self.assert_equal(counter_value, expected_value,
1078 "packet counter `%s'" % counter)
1080 def assert_error_counter_equal(self, counter, expected_value):
1081 counter_value = self.statistics.get_err_counter(counter)
1082 self.assert_equal(counter_value, expected_value,
1083 "error counter `%s'" % counter)
1086 def sleep(cls, timeout, remark=None):
1088 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1089 # * by Guido, only the main thread can be interrupted.
1091 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1094 if hasattr(os, 'sched_yield'):
1100 if hasattr(cls, 'logger'):
1101 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1102 before = time.time()
1105 if hasattr(cls, 'logger') and after - before > 2 * timeout:
1106 cls.logger.error("unexpected self.sleep() result - "
1107 "slept for %es instead of ~%es!",
1108 after - before, timeout)
1109 if hasattr(cls, 'logger'):
1111 "Finished sleep (%s) - slept %es (wanted %es)",
1112 remark, after - before, timeout)
1114 def pg_send(self, intf, pkts):
1115 self.vapi.cli("clear trace")
1116 intf.add_stream(pkts)
1117 self.pg_enable_capture(self.pg_interfaces)
1120 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
1121 self.pg_send(intf, pkts)
1124 for i in self.pg_interfaces:
1125 i.get_capture(0, timeout=timeout)
1126 i.assert_nothing_captured(remark=remark)
1129 def send_and_expect(self, intf, pkts, output, n_rx=None):
1132 self.pg_send(intf, pkts)
1133 rx = output.get_capture(n_rx)
1136 def send_and_expect_only(self, intf, pkts, output, timeout=None):
1137 self.pg_send(intf, pkts)
1138 rx = output.get_capture(len(pkts))
1142 for i in self.pg_interfaces:
1143 if i not in outputs:
1144 i.get_capture(0, timeout=timeout)
1145 i.assert_nothing_captured()
1151 """ unittest calls runTest when TestCase is instantiated without a
1152 test case. Use case: Writing unittests against VppTestCase"""
1156 def get_testcase_doc_name(test):
1157 return getdoc(test.__class__).splitlines()[0]
1160 def get_test_description(descriptions, test):
1161 short_description = test.shortDescription()
1162 if descriptions and short_description:
1163 return short_description
1168 class TestCaseInfo(object):
1169 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1170 self.logger = logger
1171 self.tempdir = tempdir
1172 self.vpp_pid = vpp_pid
1173 self.vpp_bin_path = vpp_bin_path
1174 self.core_crash_test = None
1177 class VppTestResult(unittest.TestResult):
1179 @property result_string
1180 String variable to store the test case result string.
1182 List variable containing 2-tuples of TestCase instances and strings
1183 holding formatted tracebacks. Each tuple represents a test which
1184 raised an unexpected exception.
1186 List variable containing 2-tuples of TestCase instances and strings
1187 holding formatted tracebacks. Each tuple represents a test where
1188 a failure was explicitly signalled using the TestCase.assert*()
1192 failed_test_cases_info = set()
1193 core_crash_test_cases_info = set()
1194 current_test_case_info = None
1196 def __init__(self, stream=None, descriptions=None, verbosity=None,
1199 :param stream File descriptor to store where to report test results.
1200 Set to the standard error stream by default.
1201 :param descriptions Boolean variable to store information if to use
1202 test case descriptions.
1203 :param verbosity Integer variable to store required verbosity level.
1205 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1206 self.stream = stream
1207 self.descriptions = descriptions
1208 self.verbosity = verbosity
1209 self.result_string = None
1210 self.runner = runner
1212 def addSuccess(self, test):
1214 Record a test succeeded result
1219 if self.current_test_case_info:
1220 self.current_test_case_info.logger.debug(
1221 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1222 test._testMethodName,
1223 test._testMethodDoc))
1224 unittest.TestResult.addSuccess(self, test)
1225 self.result_string = colorize("OK", GREEN)
1227 self.send_result_through_pipe(test, PASS)
1229 def addSkip(self, test, reason):
1231 Record a test skipped.
1237 if self.current_test_case_info:
1238 self.current_test_case_info.logger.debug(
1239 "--- addSkip() %s.%s(%s) called, reason is %s" %
1240 (test.__class__.__name__, test._testMethodName,
1241 test._testMethodDoc, reason))
1242 unittest.TestResult.addSkip(self, test, reason)
1243 self.result_string = colorize("SKIP", YELLOW)
1245 self.send_result_through_pipe(test, SKIP)
1247 def symlink_failed(self):
1248 if self.current_test_case_info:
1250 failed_dir = os.getenv('FAILED_DIR')
1251 link_path = os.path.join(
1254 os.path.basename(self.current_test_case_info.tempdir))
1255 if self.current_test_case_info.logger:
1256 self.current_test_case_info.logger.debug(
1257 "creating a link to the failed test")
1258 self.current_test_case_info.logger.debug(
1259 "os.symlink(%s, %s)" %
1260 (self.current_test_case_info.tempdir, link_path))
1261 if os.path.exists(link_path):
1262 if self.current_test_case_info.logger:
1263 self.current_test_case_info.logger.debug(
1264 'symlink already exists')
1266 os.symlink(self.current_test_case_info.tempdir, link_path)
1268 except Exception as e:
1269 if self.current_test_case_info.logger:
1270 self.current_test_case_info.logger.error(e)
1272 def send_result_through_pipe(self, test, result):
1273 if hasattr(self, 'test_framework_result_pipe'):
1274 pipe = self.test_framework_result_pipe
1276 pipe.send((test.id(), result))
1278 def log_error(self, test, err, fn_name):
1279 if self.current_test_case_info:
1280 if isinstance(test, unittest.suite._ErrorHolder):
1281 test_name = test.description
1283 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1284 test._testMethodName,
1285 test._testMethodDoc)
1286 self.current_test_case_info.logger.debug(
1287 "--- %s() %s called, err is %s" %
1288 (fn_name, test_name, err))
1289 self.current_test_case_info.logger.debug(
1290 "formatted exception is:\n%s" %
1291 "".join(format_exception(*err)))
1293 def add_error(self, test, err, unittest_fn, error_type):
1294 if error_type == FAIL:
1295 self.log_error(test, err, 'addFailure')
1296 error_type_str = colorize("FAIL", RED)
1297 elif error_type == ERROR:
1298 self.log_error(test, err, 'addError')
1299 error_type_str = colorize("ERROR", RED)
1301 raise Exception('Error type %s cannot be used to record an '
1302 'error or a failure' % error_type)
1304 unittest_fn(self, test, err)
1305 if self.current_test_case_info:
1306 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1308 self.current_test_case_info.tempdir)
1309 self.symlink_failed()
1310 self.failed_test_cases_info.add(self.current_test_case_info)
1311 if is_core_present(self.current_test_case_info.tempdir):
1312 if not self.current_test_case_info.core_crash_test:
1313 if isinstance(test, unittest.suite._ErrorHolder):
1314 test_name = str(test)
1316 test_name = "'{!s}' ({!s})".format(
1317 get_testcase_doc_name(test), test.id())
1318 self.current_test_case_info.core_crash_test = test_name
1319 self.core_crash_test_cases_info.add(
1320 self.current_test_case_info)
1322 self.result_string = '%s [no temp dir]' % error_type_str
1324 self.send_result_through_pipe(test, error_type)
1326 def addFailure(self, test, err):
1328 Record a test failed result
1331 :param err: error message
1334 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1336 def addError(self, test, err):
1338 Record a test error result
1341 :param err: error message
1344 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1346 def getDescription(self, test):
1348 Get test description
1351 :returns: test description
1354 return get_test_description(self.descriptions, test)
1356 def startTest(self, test):
1364 def print_header(test):
1365 if not hasattr(test.__class__, '_header_printed'):
1366 print(double_line_delim)
1367 print(colorize(getdoc(test).splitlines()[0], GREEN))
1368 print(double_line_delim)
1369 test.__class__._header_printed = True
1373 unittest.TestResult.startTest(self, test)
1374 if self.verbosity > 0:
1375 self.stream.writeln(
1376 "Starting " + self.getDescription(test) + " ...")
1377 self.stream.writeln(single_line_delim)
1379 def stopTest(self, test):
1381 Called when the given test has been run
1386 unittest.TestResult.stopTest(self, test)
1387 if self.verbosity > 0:
1388 self.stream.writeln(single_line_delim)
1389 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1390 self.result_string))
1391 self.stream.writeln(single_line_delim)
1393 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1394 self.result_string))
1396 self.send_result_through_pipe(test, TEST_RUN)
1398 def printErrors(self):
1400 Print errors from running the test case
1402 if len(self.errors) > 0 or len(self.failures) > 0:
1403 self.stream.writeln()
1404 self.printErrorList('ERROR', self.errors)
1405 self.printErrorList('FAIL', self.failures)
1407 # ^^ that is the last output from unittest before summary
1408 if not self.runner.print_summary:
1409 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1410 self.stream = devnull
1411 self.runner.stream = devnull
1413 def printErrorList(self, flavour, errors):
1415 Print error list to the output stream together with error type
1416 and test case description.
1418 :param flavour: error type
1419 :param errors: iterable errors
1422 for test, err in errors:
1423 self.stream.writeln(double_line_delim)
1424 self.stream.writeln("%s: %s" %
1425 (flavour, self.getDescription(test)))
1426 self.stream.writeln(single_line_delim)
1427 self.stream.writeln("%s" % err)
1430 class VppTestRunner(unittest.TextTestRunner):
1432 A basic test runner implementation which prints results to standard error.
1436 def resultclass(self):
1437 """Class maintaining the results of the tests"""
1438 return VppTestResult
1440 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1441 result_pipe=None, failfast=False, buffer=False,
1442 resultclass=None, print_summary=True, **kwargs):
1443 # ignore stream setting here, use hard-coded stdout to be in sync
1444 # with prints from VppTestCase methods ...
1445 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1446 verbosity, failfast, buffer,
1447 resultclass, **kwargs)
1448 KeepAliveReporter.pipe = keep_alive_pipe
1450 self.orig_stream = self.stream
1451 self.resultclass.test_framework_result_pipe = result_pipe
1453 self.print_summary = print_summary
1455 def _makeResult(self):
1456 return self.resultclass(self.stream,
1461 def run(self, test):
1468 faulthandler.enable() # emit stack trace to stderr if killed by signal
1470 result = super(VppTestRunner, self).run(test)
1471 if not self.print_summary:
1472 self.stream = self.orig_stream
1473 result.stream = self.orig_stream
1477 class Worker(Thread):
1478 def __init__(self, args, logger, env=None):
1479 self.logger = logger
1483 env = {} if env is None else env
1484 self.env = copy.deepcopy(env)
1485 super(Worker, self).__init__()
1488 executable = self.args[0]
1489 if not os.path.exists(executable) or not os.access(
1490 executable, os.F_OK | os.X_OK):
1491 # Exit code that means some system file did not exist,
1492 # could not be opened, or had some other kind of error.
1493 self.result = os.EX_OSFILE
1494 raise EnvironmentError(
1495 "executable '%s' is not found or executable." % executable)
1496 self.logger.debug("Running executable w/args `%s'" % self.args)
1497 env = os.environ.copy()
1498 env.update(self.env)
1499 env["CK_LOG_FILE_NAME"] = "-"
1500 self.process = subprocess.Popen(
1501 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1502 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1503 out, err = self.process.communicate()
1504 self.logger.debug("Finished running `%s'" % executable)
1505 self.logger.info("Return code is `%s'" % self.process.returncode)
1506 self.logger.info(single_line_delim)
1507 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1508 self.logger.info(single_line_delim)
1509 self.logger.info(out)
1510 self.logger.info(single_line_delim)
1511 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1512 self.logger.info(single_line_delim)
1513 self.logger.info(err)
1514 self.logger.info(single_line_delim)
1515 self.result = self.process.returncode
1518 if __name__ == '__main__':