3 from __future__ import print_function
17 from collections import deque
18 from threading import Thread, Event
19 from inspect import getdoc, isclass
20 from traceback import format_exception
21 from logging import FileHandler, DEBUG, Formatter
24 from scapy.packet import Raw
25 import hook as hookmodule
26 from vpp_pg_interface import VppPGInterface
27 from vpp_sub_interface import VppSubInterface
28 from vpp_lo_interface import VppLoInterface
29 from vpp_bvi_interface import VppBviInterface
30 from vpp_papi_provider import VppPapiProvider
31 from vpp_papi.vpp_stats import VPPStats
32 from vpp_papi.vpp_transport_shmem import VppTransportShmemIOError
33 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
35 from vpp_object import VppObjectRegistry
36 from util import ppp, is_core_present
37 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
38 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
39 from scapy.layers.inet6 import ICMPv6EchoReply
41 if os.name == 'posix' and sys.version_info[0] < 3:
42 # using subprocess32 is recommended by python official documentation
43 # @ https://docs.python.org/2/library/subprocess.html
44 import subprocess32 as subprocess
48 # Python2/3 compatible
61 class BoolEnvironmentVariable(object):
63 def __init__(self, env_var_name, default='n', true_values=None):
64 self.name = env_var_name
65 self.default = default
66 self.true_values = true_values if true_values is not None else \
70 return os.getenv(self.name, self.default).lower() in self.true_values
72 if sys.version_info[0] == 2:
73 __nonzero__ = __bool__
76 return 'BoolEnvironmentVariable(%r, default=%r, true_values=%r)' % \
77 (self.name, self.default, self.true_values)
80 debug_framework = BoolEnvironmentVariable('TEST_DEBUG')
85 Test framework module.
87 The module provides a set of tools for constructing and running tests and
88 representing the results.
92 class VppDiedError(Exception):
93 """ exception for reporting that the subprocess has died."""
95 signals_by_value = {v: k for k, v in signal.__dict__.items() if
96 k.startswith('SIG') and not k.startswith('SIG_')}
98 def __init__(self, rv=None, testcase=None, method_name=None):
100 self.signal_name = None
101 self.testcase = testcase
102 self.method_name = method_name
105 self.signal_name = VppDiedError.signals_by_value[-rv]
106 except (KeyError, TypeError):
109 if testcase is None and method_name is None:
112 in_msg = 'running %s.%s ' % (testcase, method_name)
114 msg = "VPP subprocess died %sunexpectedly with return code: %d%s." % (
117 ' [%s]' % (self.signal_name if
118 self.signal_name is not None else ''))
119 super(VppDiedError, self).__init__(msg)
122 class _PacketInfo(object):
123 """Private class to create packet info object.
125 Help process information about the next packet.
126 Set variables to default values.
128 #: Store the index of the packet.
130 #: Store the index of the source packet generator interface of the packet.
132 #: Store the index of the destination packet generator interface
135 #: Store expected ip version
137 #: Store expected upper protocol
139 #: Store the copy of the former packet.
142 def __eq__(self, other):
143 index = self.index == other.index
144 src = self.src == other.src
145 dst = self.dst == other.dst
146 data = self.data == other.data
147 return index and src and dst and data
150 def pump_output(testclass):
151 """ pump output from vpp stdout/stderr to proper queues """
154 while not testclass.pump_thread_stop_flag.is_set():
155 readable = select.select([testclass.vpp.stdout.fileno(),
156 testclass.vpp.stderr.fileno(),
157 testclass.pump_thread_wakeup_pipe[0]],
159 if testclass.vpp.stdout.fileno() in readable:
160 read = os.read(testclass.vpp.stdout.fileno(), 102400)
162 split = read.splitlines(True)
163 if len(stdout_fragment) > 0:
164 split[0] = "%s%s" % (stdout_fragment, split[0])
165 if len(split) > 0 and split[-1].endswith("\n"):
169 stdout_fragment = split[-1]
170 testclass.vpp_stdout_deque.extend(split[:limit])
171 if not testclass.cache_vpp_output:
172 for line in split[:limit]:
173 testclass.logger.debug(
174 "VPP STDOUT: %s" % line.rstrip("\n"))
175 if testclass.vpp.stderr.fileno() in readable:
176 read = os.read(testclass.vpp.stderr.fileno(), 102400)
178 split = read.splitlines(True)
179 if len(stderr_fragment) > 0:
180 split[0] = "%s%s" % (stderr_fragment, split[0])
181 if len(split) > 0 and split[-1].endswith(b"\n"):
185 stderr_fragment = split[-1]
186 testclass.vpp_stderr_deque.extend(split[:limit])
187 if not testclass.cache_vpp_output:
188 for line in split[:limit]:
189 testclass.logger.debug(
190 "VPP STDERR: %s" % line.rstrip("\n"))
191 # ignoring the dummy pipe here intentionally - the
192 # flag will take care of properly terminating the loop
195 def _is_skip_aarch64_set():
196 return BoolEnvironmentVariable('SKIP_AARCH64')
199 is_skip_aarch64_set = _is_skip_aarch64_set()
202 def _is_platform_aarch64():
203 return platform.machine() == 'aarch64'
206 is_platform_aarch64 = _is_platform_aarch64()
209 def _running_extended_tests():
210 return BoolEnvironmentVariable("EXTENDED_TESTS")
213 running_extended_tests = _running_extended_tests()
216 def _running_on_centos():
217 os_id = os.getenv("OS_ID", "")
218 return True if "centos" in os_id.lower() else False
221 running_on_centos = _running_on_centos()
224 class KeepAliveReporter(object):
226 Singleton object which reports test start to parent process
231 self.__dict__ = self._shared_state
239 def pipe(self, pipe):
240 if self._pipe is not None:
241 raise Exception("Internal error - pipe should only be set once.")
244 def send_keep_alive(self, test, desc=None):
246 Write current test tmpdir & desc to keep-alive pipe to signal liveness
248 if self.pipe is None:
249 # if not running forked..
253 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
257 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
260 class VppTestCase(unittest.TestCase):
261 """This subclass is a base class for VPP test cases that are implemented as
262 classes. It provides methods to create and run test case.
265 extra_vpp_punt_config = []
266 extra_vpp_plugin_config = []
269 def packet_infos(self):
270 """List of packet infos"""
271 return self._packet_infos
274 def get_packet_count_for_if_idx(cls, dst_if_index):
275 """Get the number of packet info for specified destination if index"""
276 if dst_if_index in cls._packet_count_for_dst_if_idx:
277 return cls._packet_count_for_dst_if_idx[dst_if_index]
283 """Return the instance of this testcase"""
284 return cls.test_instance
287 def set_debug_flags(cls, d):
288 cls.debug_core = False
289 cls.debug_gdb = False
290 cls.debug_gdbserver = False
295 cls.debug_core = True
298 elif dl == "gdbserver":
299 cls.debug_gdbserver = True
301 raise Exception("Unrecognized DEBUG option: '%s'" % d)
304 def get_least_used_cpu():
305 cpu_usage_list = [set(range(psutil.cpu_count()))]
306 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
307 if 'vpp_main' == p.info['name']]
308 for vpp_process in vpp_processes:
309 for cpu_usage_set in cpu_usage_list:
311 cpu_num = vpp_process.cpu_num()
312 if cpu_num in cpu_usage_set:
313 cpu_usage_set_index = cpu_usage_list.index(
315 if cpu_usage_set_index == len(cpu_usage_list) - 1:
316 cpu_usage_list.append({cpu_num})
318 cpu_usage_list[cpu_usage_set_index + 1].add(
320 cpu_usage_set.remove(cpu_num)
322 except psutil.NoSuchProcess:
325 for cpu_usage_set in cpu_usage_list:
326 if len(cpu_usage_set) > 0:
327 min_usage_set = cpu_usage_set
330 return random.choice(tuple(min_usage_set))
333 def setUpConstants(cls):
334 """ Set-up the test case class based on environment variables """
335 cls.step = BoolEnvironmentVariable('STEP')
336 d = os.getenv("DEBUG", None)
337 # inverted case to handle '' == True
338 c = os.getenv("CACHE_OUTPUT", "1")
339 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
340 cls.set_debug_flags(d)
341 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
342 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
343 cls.test_plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
344 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
346 if cls.plugin_path is not None:
347 if cls.extern_plugin_path is not None:
348 plugin_path = "%s:%s" % (
349 cls.plugin_path, cls.extern_plugin_path)
351 plugin_path = cls.plugin_path
352 elif cls.extern_plugin_path is not None:
353 plugin_path = cls.extern_plugin_path
355 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
356 debug_cli = "cli-listen localhost:5002"
358 size = os.getenv("COREDUMP_SIZE")
360 coredump_size = "coredump-size %s" % size
361 if coredump_size is None:
362 coredump_size = "coredump-size unlimited"
364 cpu_core_number = cls.get_least_used_cpu()
365 if not hasattr(cls, "worker_config"):
366 cls.worker_config = ""
368 cls.vpp_cmdline = [cls.vpp_bin, "unix",
369 "{", "nodaemon", debug_cli, "full-coredump",
370 coredump_size, "runtime-dir", cls.tempdir, "}",
371 "api-trace", "{", "on", "}", "api-segment", "{",
372 "prefix", cls.shm_prefix, "}", "cpu", "{",
373 "main-core", str(cpu_core_number),
374 cls.worker_config, "}",
375 "statseg", "{", "socket-name", cls.stats_sock, "}",
376 "socksvr", "{", "socket-name", cls.api_sock, "}",
378 "{", "plugin", "dpdk_plugin.so", "{", "disable",
379 "}", "plugin", "rdma_plugin.so", "{", "disable",
380 "}", "plugin", "unittest_plugin.so", "{", "enable",
381 "}"] + cls.extra_vpp_plugin_config + ["}", ]
382 if cls.extra_vpp_punt_config is not None:
383 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
384 if plugin_path is not None:
385 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
386 if cls.test_plugin_path is not None:
387 cls.vpp_cmdline.extend(["test_plugin_path", cls.test_plugin_path])
389 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
390 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
393 def wait_for_enter(cls):
394 if cls.debug_gdbserver:
395 print(double_line_delim)
396 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
398 print(double_line_delim)
399 print("Spawned VPP with PID: %d" % cls.vpp.pid)
401 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
403 print(single_line_delim)
404 print("You can debug the VPP using e.g.:")
405 if cls.debug_gdbserver:
406 print("sudo gdb " + cls.vpp_bin +
407 " -ex 'target remote localhost:7777'")
408 print("Now is the time to attach a gdb by running the above "
409 "command, set up breakpoints etc. and then resume VPP from "
410 "within gdb by issuing the 'continue' command")
412 print("sudo gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
413 print("Now is the time to attach a gdb by running the above "
414 "command and set up breakpoints etc.")
415 print(single_line_delim)
416 input("Press ENTER to continue running the testcase...")
420 cmdline = cls.vpp_cmdline
422 if cls.debug_gdbserver:
423 gdbserver = '/usr/bin/gdbserver'
424 if not os.path.isfile(gdbserver) or \
425 not os.access(gdbserver, os.X_OK):
426 raise Exception("gdbserver binary '%s' does not exist or is "
427 "not executable" % gdbserver)
429 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
430 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
433 cls.vpp = subprocess.Popen(cmdline,
434 stdout=subprocess.PIPE,
435 stderr=subprocess.PIPE,
437 except subprocess.CalledProcessError as e:
438 cls.logger.critical("Subprocess returned with non-0 return code: ("
442 cls.logger.critical("Subprocess returned with OS error: "
443 "(%s) %s", e.errno, e.strerror)
445 except Exception as e:
446 cls.logger.exception("Subprocess returned unexpected from "
453 def wait_for_stats_socket(cls):
454 deadline = time.time() + 300
456 while time.time() < deadline or \
457 cls.debug_gdb or cls.debug_gdbserver:
458 if os.path.exists(cls.stats_sock):
463 cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
466 def wait_for_coredump(cls):
467 corefile = cls.tempdir + "/core"
468 if os.path.isfile(corefile):
469 cls.logger.error("Waiting for coredump to complete: %s", corefile)
470 curr_size = os.path.getsize(corefile)
471 deadline = time.time() + 60
473 while time.time() < deadline:
476 curr_size = os.path.getsize(corefile)
477 if size == curr_size:
481 cls.logger.error("Timed out waiting for coredump to complete:"
484 cls.logger.error("Coredump complete: %s, size %d",
490 Perform class setup before running the testcase
491 Remove shared memory files, start vpp and connect the vpp-api
493 super(VppTestCase, cls).setUpClass()
494 gc.collect() # run garbage collection first
496 cls.logger = get_logger(cls.__name__)
497 if hasattr(cls, 'parallel_handler'):
498 cls.logger.addHandler(cls.parallel_handler)
499 cls.logger.propagate = False
501 cls.tempdir = tempfile.mkdtemp(
502 prefix='vpp-unittest-%s-' % cls.__name__)
503 cls.stats_sock = "%s/stats.sock" % cls.tempdir
504 cls.api_sock = "%s/api.sock" % cls.tempdir
505 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
506 cls.file_handler.setFormatter(
507 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
509 cls.file_handler.setLevel(DEBUG)
510 cls.logger.addHandler(cls.file_handler)
511 cls.logger.debug("--- setUpClass() for %s called ---" %
513 cls.shm_prefix = os.path.basename(cls.tempdir)
514 os.chdir(cls.tempdir)
515 cls.logger.info("Temporary dir is %s, shm prefix is %s",
516 cls.tempdir, cls.shm_prefix)
518 cls.reset_packet_infos()
522 cls.registry = VppObjectRegistry()
523 cls.vpp_startup_failed = False
524 cls.reporter = KeepAliveReporter()
525 # need to catch exceptions here because if we raise, then the cleanup
526 # doesn't get called and we might end with a zombie vpp
529 cls.reporter.send_keep_alive(cls, 'setUpClass')
530 VppTestResult.current_test_case_info = TestCaseInfo(
531 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
532 cls.vpp_stdout_deque = deque()
533 cls.vpp_stderr_deque = deque()
534 cls.pump_thread_stop_flag = Event()
535 cls.pump_thread_wakeup_pipe = os.pipe()
536 cls.pump_thread = Thread(target=pump_output, args=(cls,))
537 cls.pump_thread.daemon = True
538 cls.pump_thread.start()
539 if cls.debug_gdb or cls.debug_gdbserver:
543 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
546 hook = hookmodule.StepHook(cls)
548 hook = hookmodule.PollHook(cls)
549 cls.vapi.register_hook(hook)
550 cls.wait_for_stats_socket()
551 cls.statistics = VPPStats(socketname=cls.stats_sock)
555 cls.vpp_startup_failed = True
557 "VPP died shortly after startup, check the"
558 " output to standard error for possible cause")
564 cls.vapi.disconnect()
567 if cls.debug_gdbserver:
568 print(colorize("You're running VPP inside gdbserver but "
569 "VPP-API connection failed, did you forget "
570 "to 'continue' VPP from within gdb?", RED))
572 except Exception as e:
573 cls.logger.debug("Exception connecting to VPP: %s" % e)
581 Disconnect vpp-api, kill vpp and cleanup shared memory files
583 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
585 if cls.vpp.returncode is None:
586 print(double_line_delim)
587 print("VPP or GDB server is still running")
588 print(single_line_delim)
589 input("When done debugging, press ENTER to kill the "
590 "process and finish running the testcase...")
592 # first signal that we want to stop the pump thread, then wake it up
593 if hasattr(cls, 'pump_thread_stop_flag'):
594 cls.pump_thread_stop_flag.set()
595 if hasattr(cls, 'pump_thread_wakeup_pipe'):
596 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
597 if hasattr(cls, 'pump_thread'):
598 cls.logger.debug("Waiting for pump thread to stop")
599 cls.pump_thread.join()
600 if hasattr(cls, 'vpp_stderr_reader_thread'):
601 cls.logger.debug("Waiting for stdderr pump to stop")
602 cls.vpp_stderr_reader_thread.join()
604 if hasattr(cls, 'vpp'):
605 if hasattr(cls, 'vapi'):
606 cls.logger.debug("Disconnecting class vapi client on %s",
608 cls.vapi.disconnect()
609 cls.logger.debug("Deleting class vapi attribute on %s",
613 if cls.vpp.returncode is None:
614 cls.wait_for_coredump()
615 cls.logger.debug("Sending TERM to vpp")
617 cls.logger.debug("Waiting for vpp to die")
618 cls.vpp.communicate()
619 cls.logger.debug("Deleting class vpp attribute on %s",
623 if cls.vpp_startup_failed:
624 stdout_log = cls.logger.info
625 stderr_log = cls.logger.critical
627 stdout_log = cls.logger.info
628 stderr_log = cls.logger.info
630 if hasattr(cls, 'vpp_stdout_deque'):
631 stdout_log(single_line_delim)
632 stdout_log('VPP output to stdout while running %s:', cls.__name__)
633 stdout_log(single_line_delim)
634 vpp_output = "".join(cls.vpp_stdout_deque)
635 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
637 stdout_log('\n%s', vpp_output)
638 stdout_log(single_line_delim)
640 if hasattr(cls, 'vpp_stderr_deque'):
641 stderr_log(single_line_delim)
642 stderr_log('VPP output to stderr while running %s:', cls.__name__)
643 stderr_log(single_line_delim)
644 vpp_output = "".join(cls.vpp_stderr_deque)
645 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
647 stderr_log('\n%s', vpp_output)
648 stderr_log(single_line_delim)
651 def tearDownClass(cls):
652 """ Perform final cleanup after running all tests in this test-case """
653 cls.logger.debug("--- tearDownClass() for %s called ---" %
655 cls.reporter.send_keep_alive(cls, 'tearDownClass')
657 cls.file_handler.close()
658 cls.reset_packet_infos()
660 debug_internal.on_tear_down_class(cls)
662 def show_commands_at_teardown(self):
663 """ Allow subclass specific teardown logging additions."""
664 self.logger.info("--- No test specific show commands provided. ---")
667 """ Show various debug prints after each test """
668 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
669 (self.__class__.__name__, self._testMethodName,
670 self._testMethodDoc))
673 if not self.vpp_dead:
674 self.logger.debug(self.vapi.cli("show trace max 1000"))
675 self.logger.info(self.vapi.ppcli("show interface"))
676 self.logger.info(self.vapi.ppcli("show hardware"))
677 self.logger.info(self.statistics.set_errors_str())
678 self.logger.info(self.vapi.ppcli("show run"))
679 self.logger.info(self.vapi.ppcli("show log"))
680 self.logger.info(self.vapi.ppcli("show bihash"))
681 self.logger.info("Logging testcase specific show commands.")
682 self.show_commands_at_teardown()
683 self.registry.remove_vpp_config(self.logger)
684 # Save/Dump VPP api trace log
685 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
686 tmp_api_trace = "/tmp/%s" % api_trace
687 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
688 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
689 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
691 os.rename(tmp_api_trace, vpp_api_trace_log)
692 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
694 except VppTransportShmemIOError:
695 self.logger.debug("VppTransportShmemIOError: Vpp dead. "
696 "Cannot log show commands.")
699 self.registry.unregister_all(self.logger)
702 """ Clear trace before running each test"""
703 super(VppTestCase, self).setUp()
704 self.reporter.send_keep_alive(self)
707 raise VppDiedError(rv=None, testcase=self.__class__.__name__,
708 method_name=self._testMethodName)
709 self.sleep(.1, "during setUp")
710 self.vpp_stdout_deque.append(
711 "--- test setUp() for %s.%s(%s) starts here ---\n" %
712 (self.__class__.__name__, self._testMethodName,
713 self._testMethodDoc))
714 self.vpp_stderr_deque.append(
715 "--- test setUp() for %s.%s(%s) starts here ---\n" %
716 (self.__class__.__name__, self._testMethodName,
717 self._testMethodDoc))
718 self.vapi.cli("clear trace")
719 # store the test instance inside the test class - so that objects
720 # holding the class can access instance methods (like assertEqual)
721 type(self).test_instance = self
724 def pg_enable_capture(cls, interfaces=None):
726 Enable capture on packet-generator interfaces
728 :param interfaces: iterable interface indexes (if None,
729 use self.pg_interfaces)
732 if interfaces is None:
733 interfaces = cls.pg_interfaces
738 def register_capture(cls, cap_name):
739 """ Register a capture in the testclass """
740 # add to the list of captures with current timestamp
741 cls._captures.append((time.time(), cap_name))
745 """ Enable the PG, wait till it is done, then clean up """
746 cls.vapi.cli("trace add pg-input 1000")
747 cls.vapi.cli('packet-generator enable')
748 # PG, when starts, runs to completion -
749 # so let's avoid a race condition,
750 # and wait a little till it's done.
751 # Then clean it up - and then be gone.
752 deadline = time.time() + 300
753 while cls.vapi.cli('show packet-generator').find("Yes") != -1:
754 cls.sleep(0.01) # yield
755 if time.time() > deadline:
756 cls.logger.error("Timeout waiting for pg to stop")
758 for stamp, cap_name in cls._captures:
759 cls.vapi.cli('packet-generator delete %s' % cap_name)
763 def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
765 Create packet-generator interfaces.
767 :param interfaces: iterable indexes of the interfaces.
768 :returns: List of created interfaces.
773 intf = VppPGInterface(cls, i, gso, gso_size)
774 setattr(cls, intf.name, intf)
776 cls.pg_interfaces = result
780 def create_loopback_interfaces(cls, count):
782 Create loopback interfaces.
784 :param count: number of interfaces created.
785 :returns: List of created interfaces.
787 result = [VppLoInterface(cls) for i in range(count)]
789 setattr(cls, intf.name, intf)
790 cls.lo_interfaces = result
794 def create_bvi_interfaces(cls, count):
796 Create BVI interfaces.
798 :param count: number of interfaces created.
799 :returns: List of created interfaces.
801 result = [VppBviInterface(cls) for i in range(count)]
803 setattr(cls, intf.name, intf)
804 cls.bvi_interfaces = result
808 def extend_packet(packet, size, padding=' '):
810 Extend packet to given size by padding with spaces or custom padding
811 NOTE: Currently works only when Raw layer is present.
813 :param packet: packet
814 :param size: target size
815 :param padding: padding used to extend the payload
818 packet_len = len(packet) + 4
819 extend = size - packet_len
821 num = (extend // len(padding)) + 1
822 packet[Raw].load += (padding * num)[:extend].encode("ascii")
825 def reset_packet_infos(cls):
826 """ Reset the list of packet info objects and packet counts to zero """
827 cls._packet_infos = {}
828 cls._packet_count_for_dst_if_idx = {}
831 def create_packet_info(cls, src_if, dst_if):
833 Create packet info object containing the source and destination indexes
834 and add it to the testcase's packet info list
836 :param VppInterface src_if: source interface
837 :param VppInterface dst_if: destination interface
839 :returns: _PacketInfo object
843 info.index = len(cls._packet_infos)
844 info.src = src_if.sw_if_index
845 info.dst = dst_if.sw_if_index
846 if isinstance(dst_if, VppSubInterface):
847 dst_idx = dst_if.parent.sw_if_index
849 dst_idx = dst_if.sw_if_index
850 if dst_idx in cls._packet_count_for_dst_if_idx:
851 cls._packet_count_for_dst_if_idx[dst_idx] += 1
853 cls._packet_count_for_dst_if_idx[dst_idx] = 1
854 cls._packet_infos[info.index] = info
858 def info_to_payload(info):
860 Convert _PacketInfo object to packet payload
862 :param info: _PacketInfo object
864 :returns: string containing serialized data from packet info
866 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
870 def payload_to_info(payload, payload_field='load'):
872 Convert packet payload to _PacketInfo object
874 :param payload: packet payload
875 :type payload: <class 'scapy.packet.Raw'>
876 :param payload_field: packet fieldname of payload "load" for
877 <class 'scapy.packet.Raw'>
878 :type payload_field: str
879 :returns: _PacketInfo object containing de-serialized data from payload
882 numbers = getattr(payload, payload_field).split()
884 info.index = int(numbers[0])
885 info.src = int(numbers[1])
886 info.dst = int(numbers[2])
887 info.ip = int(numbers[3])
888 info.proto = int(numbers[4])
891 def get_next_packet_info(self, info):
893 Iterate over the packet info list stored in the testcase
894 Start iteration with first element if info is None
895 Continue based on index in info if info is specified
897 :param info: info or None
898 :returns: next info in list or None if no more infos
903 next_index = info.index + 1
904 if next_index == len(self._packet_infos):
907 return self._packet_infos[next_index]
909 def get_next_packet_info_for_interface(self, src_index, info):
911 Search the packet info list for the next packet info with same source
914 :param src_index: source interface index to search for
915 :param info: packet info - where to start the search
916 :returns: packet info or None
920 info = self.get_next_packet_info(info)
923 if info.src == src_index:
926 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
928 Search the packet info list for the next packet info with same source
929 and destination interface indexes
931 :param src_index: source interface index to search for
932 :param dst_index: destination interface index to search for
933 :param info: packet info - where to start the search
934 :returns: packet info or None
938 info = self.get_next_packet_info_for_interface(src_index, info)
941 if info.dst == dst_index:
944 def assert_equal(self, real_value, expected_value, name_or_class=None):
945 if name_or_class is None:
946 self.assertEqual(real_value, expected_value)
949 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
950 msg = msg % (getdoc(name_or_class).strip(),
951 real_value, str(name_or_class(real_value)),
952 expected_value, str(name_or_class(expected_value)))
954 msg = "Invalid %s: %s does not match expected value %s" % (
955 name_or_class, real_value, expected_value)
957 self.assertEqual(real_value, expected_value, msg)
959 def assert_in_range(self,
967 msg = "Invalid %s: %s out of range <%s,%s>" % (
968 name, real_value, expected_min, expected_max)
969 self.assertTrue(expected_min <= real_value <= expected_max, msg)
971 def assert_packet_checksums_valid(self, packet,
972 ignore_zero_udp_checksums=True):
973 received = packet.__class__(scapy.compat.raw(packet))
974 udp_layers = ['UDP', 'UDPerror']
975 checksum_fields = ['cksum', 'chksum']
978 temp = received.__class__(scapy.compat.raw(received))
980 layer = temp.getlayer(counter)
982 for cf in checksum_fields:
983 if hasattr(layer, cf):
984 if ignore_zero_udp_checksums and \
985 0 == getattr(layer, cf) and \
986 layer.name in udp_layers:
989 checksums.append((counter, cf))
992 counter = counter + 1
993 if 0 == len(checksums):
995 temp = temp.__class__(scapy.compat.raw(temp))
996 for layer, cf in checksums:
997 calc_sum = getattr(temp[layer], cf)
999 getattr(received[layer], cf), calc_sum,
1000 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
1002 "Checksum field `%s` on `%s` layer has correct value `%s`" %
1003 (cf, temp[layer].name, calc_sum))
1005 def assert_checksum_valid(self, received_packet, layer,
1006 field_name='chksum',
1007 ignore_zero_checksum=False):
1008 """ Check checksum of received packet on given layer """
1009 received_packet_checksum = getattr(received_packet[layer], field_name)
1010 if ignore_zero_checksum and 0 == received_packet_checksum:
1012 recalculated = received_packet.__class__(
1013 scapy.compat.raw(received_packet))
1014 delattr(recalculated[layer], field_name)
1015 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1016 self.assert_equal(received_packet_checksum,
1017 getattr(recalculated[layer], field_name),
1018 "packet checksum on layer: %s" % layer)
1020 def assert_ip_checksum_valid(self, received_packet,
1021 ignore_zero_checksum=False):
1022 self.assert_checksum_valid(received_packet, 'IP',
1023 ignore_zero_checksum=ignore_zero_checksum)
1025 def assert_tcp_checksum_valid(self, received_packet,
1026 ignore_zero_checksum=False):
1027 self.assert_checksum_valid(received_packet, 'TCP',
1028 ignore_zero_checksum=ignore_zero_checksum)
1030 def assert_udp_checksum_valid(self, received_packet,
1031 ignore_zero_checksum=True):
1032 self.assert_checksum_valid(received_packet, 'UDP',
1033 ignore_zero_checksum=ignore_zero_checksum)
1035 def assert_embedded_icmp_checksum_valid(self, received_packet):
1036 if received_packet.haslayer(IPerror):
1037 self.assert_checksum_valid(received_packet, 'IPerror')
1038 if received_packet.haslayer(TCPerror):
1039 self.assert_checksum_valid(received_packet, 'TCPerror')
1040 if received_packet.haslayer(UDPerror):
1041 self.assert_checksum_valid(received_packet, 'UDPerror',
1042 ignore_zero_checksum=True)
1043 if received_packet.haslayer(ICMPerror):
1044 self.assert_checksum_valid(received_packet, 'ICMPerror')
1046 def assert_icmp_checksum_valid(self, received_packet):
1047 self.assert_checksum_valid(received_packet, 'ICMP')
1048 self.assert_embedded_icmp_checksum_valid(received_packet)
1050 def assert_icmpv6_checksum_valid(self, pkt):
1051 if pkt.haslayer(ICMPv6DestUnreach):
1052 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
1053 self.assert_embedded_icmp_checksum_valid(pkt)
1054 if pkt.haslayer(ICMPv6EchoRequest):
1055 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
1056 if pkt.haslayer(ICMPv6EchoReply):
1057 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
1059 def get_packet_counter(self, counter):
1060 if counter.startswith("/"):
1061 counter_value = self.statistics.get_counter(counter)
1063 counters = self.vapi.cli("sh errors").split('\n')
1065 for i in range(1, len(counters) - 1):
1066 results = counters[i].split()
1067 if results[1] == counter:
1068 counter_value = int(results[0])
1070 return counter_value
1072 def assert_packet_counter_equal(self, counter, expected_value):
1073 counter_value = self.get_packet_counter(counter)
1074 self.assert_equal(counter_value, expected_value,
1075 "packet counter `%s'" % counter)
1077 def assert_error_counter_equal(self, counter, expected_value):
1078 counter_value = self.statistics.get_err_counter(counter)
1079 self.assert_equal(counter_value, expected_value,
1080 "error counter `%s'" % counter)
1083 def sleep(cls, timeout, remark=None):
1085 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1086 # * by Guido, only the main thread can be interrupted.
1088 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1091 if hasattr(os, 'sched_yield'):
1097 if hasattr(cls, 'logger'):
1098 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1099 before = time.time()
1102 if hasattr(cls, 'logger') and after - before > 2 * timeout:
1103 cls.logger.error("unexpected self.sleep() result - "
1104 "slept for %es instead of ~%es!",
1105 after - before, timeout)
1106 if hasattr(cls, 'logger'):
1108 "Finished sleep (%s) - slept %es (wanted %es)",
1109 remark, after - before, timeout)
1111 def pg_send(self, intf, pkts):
1112 self.vapi.cli("clear trace")
1113 intf.add_stream(pkts)
1114 self.pg_enable_capture(self.pg_interfaces)
1117 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
1118 self.pg_send(intf, pkts)
1121 for i in self.pg_interfaces:
1122 i.get_capture(0, timeout=timeout)
1123 i.assert_nothing_captured(remark=remark)
1126 def send_and_expect(self, intf, pkts, output, n_rx=None):
1129 self.pg_send(intf, pkts)
1130 rx = output.get_capture(n_rx)
1133 def send_and_expect_only(self, intf, pkts, output, timeout=None):
1134 self.pg_send(intf, pkts)
1135 rx = output.get_capture(len(pkts))
1139 for i in self.pg_interfaces:
1140 if i not in outputs:
1141 i.get_capture(0, timeout=timeout)
1142 i.assert_nothing_captured()
1148 """ unittest calls runTest when TestCase is instantiated without a
1149 test case. Use case: Writing unittests against VppTestCase"""
1153 def get_testcase_doc_name(test):
1154 return getdoc(test.__class__).splitlines()[0]
1157 def get_test_description(descriptions, test):
1158 short_description = test.shortDescription()
1159 if descriptions and short_description:
1160 return short_description
1165 class TestCaseInfo(object):
1166 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1167 self.logger = logger
1168 self.tempdir = tempdir
1169 self.vpp_pid = vpp_pid
1170 self.vpp_bin_path = vpp_bin_path
1171 self.core_crash_test = None
1174 class VppTestResult(unittest.TestResult):
1176 @property result_string
1177 String variable to store the test case result string.
1179 List variable containing 2-tuples of TestCase instances and strings
1180 holding formatted tracebacks. Each tuple represents a test which
1181 raised an unexpected exception.
1183 List variable containing 2-tuples of TestCase instances and strings
1184 holding formatted tracebacks. Each tuple represents a test where
1185 a failure was explicitly signalled using the TestCase.assert*()
1189 failed_test_cases_info = set()
1190 core_crash_test_cases_info = set()
1191 current_test_case_info = None
1193 def __init__(self, stream=None, descriptions=None, verbosity=None,
1196 :param stream File descriptor to store where to report test results.
1197 Set to the standard error stream by default.
1198 :param descriptions Boolean variable to store information if to use
1199 test case descriptions.
1200 :param verbosity Integer variable to store required verbosity level.
1202 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1203 self.stream = stream
1204 self.descriptions = descriptions
1205 self.verbosity = verbosity
1206 self.result_string = None
1207 self.runner = runner
1209 def addSuccess(self, test):
1211 Record a test succeeded result
1216 if self.current_test_case_info:
1217 self.current_test_case_info.logger.debug(
1218 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1219 test._testMethodName,
1220 test._testMethodDoc))
1221 unittest.TestResult.addSuccess(self, test)
1222 self.result_string = colorize("OK", GREEN)
1224 self.send_result_through_pipe(test, PASS)
1226 def addSkip(self, test, reason):
1228 Record a test skipped.
1234 if self.current_test_case_info:
1235 self.current_test_case_info.logger.debug(
1236 "--- addSkip() %s.%s(%s) called, reason is %s" %
1237 (test.__class__.__name__, test._testMethodName,
1238 test._testMethodDoc, reason))
1239 unittest.TestResult.addSkip(self, test, reason)
1240 self.result_string = colorize("SKIP", YELLOW)
1242 self.send_result_through_pipe(test, SKIP)
1244 def symlink_failed(self):
1245 if self.current_test_case_info:
1247 failed_dir = os.getenv('FAILED_DIR')
1248 link_path = os.path.join(
1251 os.path.basename(self.current_test_case_info.tempdir))
1252 if self.current_test_case_info.logger:
1253 self.current_test_case_info.logger.debug(
1254 "creating a link to the failed test")
1255 self.current_test_case_info.logger.debug(
1256 "os.symlink(%s, %s)" %
1257 (self.current_test_case_info.tempdir, link_path))
1258 if os.path.exists(link_path):
1259 if self.current_test_case_info.logger:
1260 self.current_test_case_info.logger.debug(
1261 'symlink already exists')
1263 os.symlink(self.current_test_case_info.tempdir, link_path)
1265 except Exception as e:
1266 if self.current_test_case_info.logger:
1267 self.current_test_case_info.logger.error(e)
1269 def send_result_through_pipe(self, test, result):
1270 if hasattr(self, 'test_framework_result_pipe'):
1271 pipe = self.test_framework_result_pipe
1273 pipe.send((test.id(), result))
1275 def log_error(self, test, err, fn_name):
1276 if self.current_test_case_info:
1277 if isinstance(test, unittest.suite._ErrorHolder):
1278 test_name = test.description
1280 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1281 test._testMethodName,
1282 test._testMethodDoc)
1283 self.current_test_case_info.logger.debug(
1284 "--- %s() %s called, err is %s" %
1285 (fn_name, test_name, err))
1286 self.current_test_case_info.logger.debug(
1287 "formatted exception is:\n%s" %
1288 "".join(format_exception(*err)))
1290 def add_error(self, test, err, unittest_fn, error_type):
1291 if error_type == FAIL:
1292 self.log_error(test, err, 'addFailure')
1293 error_type_str = colorize("FAIL", RED)
1294 elif error_type == ERROR:
1295 self.log_error(test, err, 'addError')
1296 error_type_str = colorize("ERROR", RED)
1298 raise Exception('Error type %s cannot be used to record an '
1299 'error or a failure' % error_type)
1301 unittest_fn(self, test, err)
1302 if self.current_test_case_info:
1303 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1305 self.current_test_case_info.tempdir)
1306 self.symlink_failed()
1307 self.failed_test_cases_info.add(self.current_test_case_info)
1308 if is_core_present(self.current_test_case_info.tempdir):
1309 if not self.current_test_case_info.core_crash_test:
1310 if isinstance(test, unittest.suite._ErrorHolder):
1311 test_name = str(test)
1313 test_name = "'{!s}' ({!s})".format(
1314 get_testcase_doc_name(test), test.id())
1315 self.current_test_case_info.core_crash_test = test_name
1316 self.core_crash_test_cases_info.add(
1317 self.current_test_case_info)
1319 self.result_string = '%s [no temp dir]' % error_type_str
1321 self.send_result_through_pipe(test, error_type)
1323 def addFailure(self, test, err):
1325 Record a test failed result
1328 :param err: error message
1331 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1333 def addError(self, test, err):
1335 Record a test error result
1338 :param err: error message
1341 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1343 def getDescription(self, test):
1345 Get test description
1348 :returns: test description
1351 return get_test_description(self.descriptions, test)
1353 def startTest(self, test):
1361 def print_header(test):
1362 if not hasattr(test.__class__, '_header_printed'):
1363 print(double_line_delim)
1364 print(colorize(getdoc(test).splitlines()[0], GREEN))
1365 print(double_line_delim)
1366 test.__class__._header_printed = True
1370 unittest.TestResult.startTest(self, test)
1371 if self.verbosity > 0:
1372 self.stream.writeln(
1373 "Starting " + self.getDescription(test) + " ...")
1374 self.stream.writeln(single_line_delim)
1376 def stopTest(self, test):
1378 Called when the given test has been run
1383 unittest.TestResult.stopTest(self, test)
1384 if self.verbosity > 0:
1385 self.stream.writeln(single_line_delim)
1386 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1387 self.result_string))
1388 self.stream.writeln(single_line_delim)
1390 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1391 self.result_string))
1393 self.send_result_through_pipe(test, TEST_RUN)
1395 def printErrors(self):
1397 Print errors from running the test case
1399 if len(self.errors) > 0 or len(self.failures) > 0:
1400 self.stream.writeln()
1401 self.printErrorList('ERROR', self.errors)
1402 self.printErrorList('FAIL', self.failures)
1404 # ^^ that is the last output from unittest before summary
1405 if not self.runner.print_summary:
1406 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1407 self.stream = devnull
1408 self.runner.stream = devnull
1410 def printErrorList(self, flavour, errors):
1412 Print error list to the output stream together with error type
1413 and test case description.
1415 :param flavour: error type
1416 :param errors: iterable errors
1419 for test, err in errors:
1420 self.stream.writeln(double_line_delim)
1421 self.stream.writeln("%s: %s" %
1422 (flavour, self.getDescription(test)))
1423 self.stream.writeln(single_line_delim)
1424 self.stream.writeln("%s" % err)
1427 class VppTestRunner(unittest.TextTestRunner):
1429 A basic test runner implementation which prints results to standard error.
1433 def resultclass(self):
1434 """Class maintaining the results of the tests"""
1435 return VppTestResult
1437 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1438 result_pipe=None, failfast=False, buffer=False,
1439 resultclass=None, print_summary=True, **kwargs):
1440 # ignore stream setting here, use hard-coded stdout to be in sync
1441 # with prints from VppTestCase methods ...
1442 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1443 verbosity, failfast, buffer,
1444 resultclass, **kwargs)
1445 KeepAliveReporter.pipe = keep_alive_pipe
1447 self.orig_stream = self.stream
1448 self.resultclass.test_framework_result_pipe = result_pipe
1450 self.print_summary = print_summary
1452 def _makeResult(self):
1453 return self.resultclass(self.stream,
1458 def run(self, test):
1465 faulthandler.enable() # emit stack trace to stderr if killed by signal
1467 result = super(VppTestRunner, self).run(test)
1468 if not self.print_summary:
1469 self.stream = self.orig_stream
1470 result.stream = self.orig_stream
1474 class Worker(Thread):
1475 def __init__(self, args, logger, env=None):
1476 self.logger = logger
1480 env = {} if env is None else env
1481 self.env = copy.deepcopy(env)
1482 super(Worker, self).__init__()
1485 executable = self.args[0]
1486 if not os.path.exists(executable) or not os.access(
1487 executable, os.F_OK | os.X_OK):
1488 # Exit code that means some system file did not exist,
1489 # could not be opened, or had some other kind of error.
1490 self.result = os.EX_OSFILE
1491 raise EnvironmentError(
1492 "executable '%s' is not found or executable." % executable)
1493 self.logger.debug("Running executable w/args `%s'" % self.args)
1494 env = os.environ.copy()
1495 env.update(self.env)
1496 env["CK_LOG_FILE_NAME"] = "-"
1497 self.process = subprocess.Popen(
1498 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1499 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1500 out, err = self.process.communicate()
1501 self.logger.debug("Finished running `%s'" % executable)
1502 self.logger.info("Return code is `%s'" % self.process.returncode)
1503 self.logger.info(single_line_delim)
1504 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1505 self.logger.info(single_line_delim)
1506 self.logger.info(out)
1507 self.logger.info(single_line_delim)
1508 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1509 self.logger.info(single_line_delim)
1510 self.logger.info(err)
1511 self.logger.info(single_line_delim)
1512 self.result = self.process.returncode
1515 if __name__ == '__main__':