3 from __future__ import print_function
17 from collections import deque
18 from threading import Thread, Event
19 from inspect import getdoc, isclass
20 from traceback import format_exception
21 from logging import FileHandler, DEBUG, Formatter
24 from scapy.packet import Raw
25 import hook as hookmodule
26 from vpp_pg_interface import VppPGInterface
27 from vpp_sub_interface import VppSubInterface
28 from vpp_lo_interface import VppLoInterface
29 from vpp_bvi_interface import VppBviInterface
30 from vpp_papi_provider import VppPapiProvider
31 from vpp_papi.vpp_stats import VPPStats
32 from vpp_papi.vpp_transport_shmem import VppTransportShmemIOError
33 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
35 from vpp_object import VppObjectRegistry
36 from util import ppp, is_core_present
37 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
38 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
39 from scapy.layers.inet6 import ICMPv6EchoReply
41 if os.name == 'posix' and sys.version_info[0] < 3:
42 # using subprocess32 is recommended by python official documentation
43 # @ https://docs.python.org/2/library/subprocess.html
44 import subprocess32 as subprocess
48 # Python2/3 compatible
61 class BoolEnvironmentVariable(object):
63 def __init__(self, env_var_name, default='n', true_values=None):
64 self.name = env_var_name
65 self.default = default
66 self.true_values = true_values if true_values is not None else \
70 return os.getenv(self.name, self.default).lower() in self.true_values
72 if sys.version_info[0] == 2:
73 __nonzero__ = __bool__
76 return 'BoolEnvironmentVariable(%r, default=%r, true_values=%r)' % \
77 (self.name, self.default, self.true_values)
80 debug_framework = BoolEnvironmentVariable('TEST_DEBUG')
85 Test framework module.
87 The module provides a set of tools for constructing and running tests and
88 representing the results.
92 class VppDiedError(Exception):
93 """ exception for reporting that the subprocess has died."""
95 signals_by_value = {v: k for k, v in signal.__dict__.items() if
96 k.startswith('SIG') and not k.startswith('SIG_')}
98 def __init__(self, rv=None, testcase=None, method_name=None):
100 self.signal_name = None
101 self.testcase = testcase
102 self.method_name = method_name
105 self.signal_name = VppDiedError.signals_by_value[-rv]
106 except (KeyError, TypeError):
109 if testcase is None and method_name is None:
112 in_msg = 'running %s.%s ' % (testcase, method_name)
114 msg = "VPP subprocess died %sunexpectedly with return code: %d%s." % (
117 ' [%s]' % (self.signal_name if
118 self.signal_name is not None else ''))
119 super(VppDiedError, self).__init__(msg)
122 class _PacketInfo(object):
123 """Private class to create packet info object.
125 Help process information about the next packet.
126 Set variables to default values.
128 #: Store the index of the packet.
130 #: Store the index of the source packet generator interface of the packet.
132 #: Store the index of the destination packet generator interface
135 #: Store expected ip version
137 #: Store expected upper protocol
139 #: Store the copy of the former packet.
142 def __eq__(self, other):
143 index = self.index == other.index
144 src = self.src == other.src
145 dst = self.dst == other.dst
146 data = self.data == other.data
147 return index and src and dst and data
150 def pump_output(testclass):
151 """ pump output from vpp stdout/stderr to proper queues """
154 while not testclass.pump_thread_stop_flag.is_set():
155 readable = select.select([testclass.vpp.stdout.fileno(),
156 testclass.vpp.stderr.fileno(),
157 testclass.pump_thread_wakeup_pipe[0]],
159 if testclass.vpp.stdout.fileno() in readable:
160 read = os.read(testclass.vpp.stdout.fileno(), 102400)
162 split = read.decode('ascii',
163 errors='backslashreplace').splitlines(True)
164 if len(stdout_fragment) > 0:
165 split[0] = "%s%s" % (stdout_fragment, split[0])
166 if len(split) > 0 and split[-1].endswith("\n"):
170 stdout_fragment = split[-1]
171 testclass.vpp_stdout_deque.extend(split[:limit])
172 if not testclass.cache_vpp_output:
173 for line in split[:limit]:
174 testclass.logger.debug(
175 "VPP STDOUT: %s" % line.rstrip("\n"))
176 if testclass.vpp.stderr.fileno() in readable:
177 read = os.read(testclass.vpp.stderr.fileno(), 102400)
179 split = read.decode('ascii',
180 errors='backslashreplace').splitlines(True)
181 if len(stderr_fragment) > 0:
182 split[0] = "%s%s" % (stderr_fragment, split[0])
183 if len(split) > 0 and split[-1].endswith("\n"):
187 stderr_fragment = split[-1]
189 testclass.vpp_stderr_deque.extend(split[:limit])
190 if not testclass.cache_vpp_output:
191 for line in split[:limit]:
192 testclass.logger.debug(
193 "VPP STDERR: %s" % line.rstrip("\n"))
194 # ignoring the dummy pipe here intentionally - the
195 # flag will take care of properly terminating the loop
198 def _is_skip_aarch64_set():
199 return BoolEnvironmentVariable('SKIP_AARCH64')
202 is_skip_aarch64_set = _is_skip_aarch64_set()
205 def _is_platform_aarch64():
206 return platform.machine() == 'aarch64'
209 is_platform_aarch64 = _is_platform_aarch64()
212 def _running_extended_tests():
213 return BoolEnvironmentVariable("EXTENDED_TESTS")
216 running_extended_tests = _running_extended_tests()
219 def _running_on_centos():
220 os_id = os.getenv("OS_ID", "")
221 return True if "centos" in os_id.lower() else False
224 running_on_centos = _running_on_centos()
227 class KeepAliveReporter(object):
229 Singleton object which reports test start to parent process
234 self.__dict__ = self._shared_state
242 def pipe(self, pipe):
243 if self._pipe is not None:
244 raise Exception("Internal error - pipe should only be set once.")
247 def send_keep_alive(self, test, desc=None):
249 Write current test tmpdir & desc to keep-alive pipe to signal liveness
251 if self.pipe is None:
252 # if not running forked..
256 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
260 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
263 class VppTestCase(unittest.TestCase):
264 """This subclass is a base class for VPP test cases that are implemented as
265 classes. It provides methods to create and run test case.
268 extra_vpp_punt_config = []
269 extra_vpp_plugin_config = []
272 def packet_infos(self):
273 """List of packet infos"""
274 return self._packet_infos
277 def get_packet_count_for_if_idx(cls, dst_if_index):
278 """Get the number of packet info for specified destination if index"""
279 if dst_if_index in cls._packet_count_for_dst_if_idx:
280 return cls._packet_count_for_dst_if_idx[dst_if_index]
286 """Return the instance of this testcase"""
287 return cls.test_instance
290 def set_debug_flags(cls, d):
291 cls.gdbserver_port = 7777
292 cls.debug_core = False
293 cls.debug_gdb = False
294 cls.debug_gdbserver = False
295 cls.debug_all = False
300 cls.debug_core = True
301 elif dl == "gdb" or dl == "gdb-all":
303 elif dl == "gdbserver" or dl == "gdbserver-all":
304 cls.debug_gdbserver = True
306 raise Exception("Unrecognized DEBUG option: '%s'" % d)
307 if dl == "gdb-all" or dl == "gdbserver-all":
311 def get_least_used_cpu():
312 cpu_usage_list = [set(range(psutil.cpu_count()))]
313 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
314 if 'vpp_main' == p.info['name']]
315 for vpp_process in vpp_processes:
316 for cpu_usage_set in cpu_usage_list:
318 cpu_num = vpp_process.cpu_num()
319 if cpu_num in cpu_usage_set:
320 cpu_usage_set_index = cpu_usage_list.index(
322 if cpu_usage_set_index == len(cpu_usage_list) - 1:
323 cpu_usage_list.append({cpu_num})
325 cpu_usage_list[cpu_usage_set_index + 1].add(
327 cpu_usage_set.remove(cpu_num)
329 except psutil.NoSuchProcess:
332 for cpu_usage_set in cpu_usage_list:
333 if len(cpu_usage_set) > 0:
334 min_usage_set = cpu_usage_set
337 return random.choice(tuple(min_usage_set))
340 def setUpConstants(cls):
341 """ Set-up the test case class based on environment variables """
342 cls.step = BoolEnvironmentVariable('STEP')
343 d = os.getenv("DEBUG", None)
344 # inverted case to handle '' == True
345 c = os.getenv("CACHE_OUTPUT", "1")
346 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
347 cls.set_debug_flags(d)
348 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
349 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
350 cls.test_plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
351 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
353 if cls.plugin_path is not None:
354 if cls.extern_plugin_path is not None:
355 plugin_path = "%s:%s" % (
356 cls.plugin_path, cls.extern_plugin_path)
358 plugin_path = cls.plugin_path
359 elif cls.extern_plugin_path is not None:
360 plugin_path = cls.extern_plugin_path
362 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
363 debug_cli = "cli-listen localhost:5002"
365 size = os.getenv("COREDUMP_SIZE")
367 coredump_size = "coredump-size %s" % size
368 if coredump_size is None:
369 coredump_size = "coredump-size unlimited"
371 cpu_core_number = cls.get_least_used_cpu()
372 if not hasattr(cls, "worker_config"):
373 cls.worker_config = ""
375 cls.vpp_cmdline = [cls.vpp_bin, "unix",
376 "{", "nodaemon", debug_cli, "full-coredump",
377 coredump_size, "runtime-dir", cls.tempdir, "}",
378 "api-trace", "{", "on", "}", "api-segment", "{",
379 "prefix", cls.shm_prefix, "}", "cpu", "{",
380 "main-core", str(cpu_core_number),
381 cls.worker_config, "}",
382 "statseg", "{", "socket-name", cls.stats_sock, "}",
383 "socksvr", "{", "socket-name", cls.api_sock, "}",
385 "{", "plugin", "dpdk_plugin.so", "{", "disable",
386 "}", "plugin", "rdma_plugin.so", "{", "disable",
387 "}", "plugin", "unittest_plugin.so", "{", "enable",
388 "}"] + cls.extra_vpp_plugin_config + ["}", ]
389 if cls.extra_vpp_punt_config is not None:
390 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
391 if plugin_path is not None:
392 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
393 if cls.test_plugin_path is not None:
394 cls.vpp_cmdline.extend(["test_plugin_path", cls.test_plugin_path])
396 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
397 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
400 def wait_for_enter(cls):
401 if cls.debug_gdbserver:
402 print(double_line_delim)
403 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
405 print(double_line_delim)
406 print("Spawned VPP with PID: %d" % cls.vpp.pid)
408 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
410 print(single_line_delim)
411 print("You can debug VPP using:")
412 if cls.debug_gdbserver:
413 print("sudo gdb " + cls.vpp_bin +
414 " -ex 'target remote localhost:{port}'"
415 .format(port=cls.gdbserver_port))
416 print("Now is the time to attach gdb by running the above "
417 "command, set up breakpoints etc., then resume VPP from "
418 "within gdb by issuing the 'continue' command")
419 cls.gdbserver_port += 1
421 print("sudo gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
422 print("Now is the time to attach gdb by running the above "
423 "command and set up breakpoints etc., then resume VPP from"
424 " within gdb by issuing the 'continue' command")
425 print(single_line_delim)
426 input("Press ENTER to continue running the testcase...")
430 cmdline = cls.vpp_cmdline
432 if cls.debug_gdbserver:
433 gdbserver = '/usr/bin/gdbserver'
434 if not os.path.isfile(gdbserver) or \
435 not os.access(gdbserver, os.X_OK):
436 raise Exception("gdbserver binary '%s' does not exist or is "
437 "not executable" % gdbserver)
439 cmdline = [gdbserver, 'localhost:{port}'
440 .format(port=cls.gdbserver_port)] + cls.vpp_cmdline
441 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
444 cls.vpp = subprocess.Popen(cmdline,
445 stdout=subprocess.PIPE,
446 stderr=subprocess.PIPE,
448 except subprocess.CalledProcessError as e:
449 cls.logger.critical("Subprocess returned with non-0 return code: ("
453 cls.logger.critical("Subprocess returned with OS error: "
454 "(%s) %s", e.errno, e.strerror)
456 except Exception as e:
457 cls.logger.exception("Subprocess returned unexpected from "
464 def wait_for_stats_socket(cls):
465 deadline = time.time() + 300
467 while time.time() < deadline or \
468 cls.debug_gdb or cls.debug_gdbserver:
469 if os.path.exists(cls.stats_sock):
474 cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
477 def wait_for_coredump(cls):
478 corefile = cls.tempdir + "/core"
479 if os.path.isfile(corefile):
480 cls.logger.error("Waiting for coredump to complete: %s", corefile)
481 curr_size = os.path.getsize(corefile)
482 deadline = time.time() + 60
484 while time.time() < deadline:
487 curr_size = os.path.getsize(corefile)
488 if size == curr_size:
492 cls.logger.error("Timed out waiting for coredump to complete:"
495 cls.logger.error("Coredump complete: %s, size %d",
501 Perform class setup before running the testcase
502 Remove shared memory files, start vpp and connect the vpp-api
504 super(VppTestCase, cls).setUpClass()
505 gc.collect() # run garbage collection first
507 cls.logger = get_logger(cls.__name__)
508 if hasattr(cls, 'parallel_handler'):
509 cls.logger.addHandler(cls.parallel_handler)
510 cls.logger.propagate = False
512 cls.tempdir = tempfile.mkdtemp(
513 prefix='vpp-unittest-%s-' % cls.__name__)
514 cls.stats_sock = "%s/stats.sock" % cls.tempdir
515 cls.api_sock = "%s/api.sock" % cls.tempdir
516 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
517 cls.file_handler.setFormatter(
518 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
520 cls.file_handler.setLevel(DEBUG)
521 cls.logger.addHandler(cls.file_handler)
522 cls.logger.debug("--- setUpClass() for %s called ---" %
524 cls.shm_prefix = os.path.basename(cls.tempdir)
525 os.chdir(cls.tempdir)
526 cls.logger.info("Temporary dir is %s, shm prefix is %s",
527 cls.tempdir, cls.shm_prefix)
529 cls.reset_packet_infos()
533 cls.registry = VppObjectRegistry()
534 cls.vpp_startup_failed = False
535 cls.reporter = KeepAliveReporter()
536 # need to catch exceptions here because if we raise, then the cleanup
537 # doesn't get called and we might end with a zombie vpp
540 cls.reporter.send_keep_alive(cls, 'setUpClass')
541 VppTestResult.current_test_case_info = TestCaseInfo(
542 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
543 cls.vpp_stdout_deque = deque()
544 cls.vpp_stderr_deque = deque()
545 cls.pump_thread_stop_flag = Event()
546 cls.pump_thread_wakeup_pipe = os.pipe()
547 cls.pump_thread = Thread(target=pump_output, args=(cls,))
548 cls.pump_thread.daemon = True
549 cls.pump_thread.start()
550 if cls.debug_gdb or cls.debug_gdbserver:
554 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
557 hook = hookmodule.StepHook(cls)
559 hook = hookmodule.PollHook(cls)
560 cls.vapi.register_hook(hook)
561 cls.wait_for_stats_socket()
562 cls.statistics = VPPStats(socketname=cls.stats_sock)
566 cls.vpp_startup_failed = True
568 "VPP died shortly after startup, check the"
569 " output to standard error for possible cause")
575 cls.vapi.disconnect()
578 if cls.debug_gdbserver:
579 print(colorize("You're running VPP inside gdbserver but "
580 "VPP-API connection failed, did you forget "
581 "to 'continue' VPP from within gdb?", RED))
583 except Exception as e:
584 cls.logger.debug("Exception connecting to VPP: %s" % e)
592 Disconnect vpp-api, kill vpp and cleanup shared memory files
594 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
596 if cls.vpp.returncode is None:
598 print(double_line_delim)
599 print("VPP or GDB server is still running")
600 print(single_line_delim)
601 input("When done debugging, press ENTER to kill the "
602 "process and finish running the testcase...")
604 # first signal that we want to stop the pump thread, then wake it up
605 if hasattr(cls, 'pump_thread_stop_flag'):
606 cls.pump_thread_stop_flag.set()
607 if hasattr(cls, 'pump_thread_wakeup_pipe'):
608 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
609 if hasattr(cls, 'pump_thread'):
610 cls.logger.debug("Waiting for pump thread to stop")
611 cls.pump_thread.join()
612 if hasattr(cls, 'vpp_stderr_reader_thread'):
613 cls.logger.debug("Waiting for stdderr pump to stop")
614 cls.vpp_stderr_reader_thread.join()
616 if hasattr(cls, 'vpp'):
617 if hasattr(cls, 'vapi'):
618 cls.logger.debug("Disconnecting class vapi client on %s",
620 cls.vapi.disconnect()
621 cls.logger.debug("Deleting class vapi attribute on %s",
625 if cls.vpp.returncode is None:
626 cls.wait_for_coredump()
627 cls.logger.debug("Sending TERM to vpp")
629 cls.logger.debug("Waiting for vpp to die")
630 cls.vpp.communicate()
631 cls.logger.debug("Deleting class vpp attribute on %s",
635 if cls.vpp_startup_failed:
636 stdout_log = cls.logger.info
637 stderr_log = cls.logger.critical
639 stdout_log = cls.logger.info
640 stderr_log = cls.logger.info
642 if hasattr(cls, 'vpp_stdout_deque'):
643 stdout_log(single_line_delim)
644 stdout_log('VPP output to stdout while running %s:', cls.__name__)
645 stdout_log(single_line_delim)
646 vpp_output = "".join(cls.vpp_stdout_deque)
647 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
649 stdout_log('\n%s', vpp_output)
650 stdout_log(single_line_delim)
652 if hasattr(cls, 'vpp_stderr_deque'):
653 stderr_log(single_line_delim)
654 stderr_log('VPP output to stderr while running %s:', cls.__name__)
655 stderr_log(single_line_delim)
656 vpp_output = "".join(cls.vpp_stderr_deque)
657 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
659 stderr_log('\n%s', vpp_output)
660 stderr_log(single_line_delim)
663 def tearDownClass(cls):
664 """ Perform final cleanup after running all tests in this test-case """
665 cls.logger.debug("--- tearDownClass() for %s called ---" %
667 cls.reporter.send_keep_alive(cls, 'tearDownClass')
669 cls.file_handler.close()
670 cls.reset_packet_infos()
672 debug_internal.on_tear_down_class(cls)
674 def show_commands_at_teardown(self):
675 """ Allow subclass specific teardown logging additions."""
676 self.logger.info("--- No test specific show commands provided. ---")
679 """ Show various debug prints after each test """
680 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
681 (self.__class__.__name__, self._testMethodName,
682 self._testMethodDoc))
685 if not self.vpp_dead:
686 self.logger.debug(self.vapi.cli("show trace max 1000"))
687 self.logger.info(self.vapi.ppcli("show interface"))
688 self.logger.info(self.vapi.ppcli("show hardware"))
689 self.logger.info(self.statistics.set_errors_str())
690 self.logger.info(self.vapi.ppcli("show run"))
691 self.logger.info(self.vapi.ppcli("show log"))
692 self.logger.info(self.vapi.ppcli("show bihash"))
693 self.logger.info("Logging testcase specific show commands.")
694 self.show_commands_at_teardown()
695 self.registry.remove_vpp_config(self.logger)
696 # Save/Dump VPP api trace log
697 m = self._testMethodName
698 api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
699 tmp_api_trace = "/tmp/%s" % api_trace
700 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
701 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
702 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
704 os.rename(tmp_api_trace, vpp_api_trace_log)
705 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
707 except VppTransportShmemIOError:
708 self.logger.debug("VppTransportShmemIOError: Vpp dead. "
709 "Cannot log show commands.")
712 self.registry.unregister_all(self.logger)
715 """ Clear trace before running each test"""
716 super(VppTestCase, self).setUp()
717 self.reporter.send_keep_alive(self)
720 raise VppDiedError(rv=None, testcase=self.__class__.__name__,
721 method_name=self._testMethodName)
722 self.sleep(.1, "during setUp")
723 self.vpp_stdout_deque.append(
724 "--- test setUp() for %s.%s(%s) starts here ---\n" %
725 (self.__class__.__name__, self._testMethodName,
726 self._testMethodDoc))
727 self.vpp_stderr_deque.append(
728 "--- test setUp() for %s.%s(%s) starts here ---\n" %
729 (self.__class__.__name__, self._testMethodName,
730 self._testMethodDoc))
731 self.vapi.cli("clear trace")
732 # store the test instance inside the test class - so that objects
733 # holding the class can access instance methods (like assertEqual)
734 type(self).test_instance = self
737 def pg_enable_capture(cls, interfaces=None):
739 Enable capture on packet-generator interfaces
741 :param interfaces: iterable interface indexes (if None,
742 use self.pg_interfaces)
745 if interfaces is None:
746 interfaces = cls.pg_interfaces
751 def register_capture(cls, cap_name):
752 """ Register a capture in the testclass """
753 # add to the list of captures with current timestamp
754 cls._captures.append((time.time(), cap_name))
757 def get_vpp_time(cls):
758 return float(cls.vapi.cli('show clock').replace("Time now ", ""))
761 def sleep_on_vpp_time(cls, sec):
762 """ Sleep according to time in VPP world """
763 # On a busy system with many processes
764 # we might end up with VPP time being slower than real world
765 # So take that into account when waiting for VPP to do something
766 start_time = cls.get_vpp_time()
767 while cls.get_vpp_time() - start_time < sec:
772 """ Enable the PG, wait till it is done, then clean up """
773 cls.vapi.cli("trace add pg-input 1000")
774 cls.vapi.cli('packet-generator enable')
775 # PG, when starts, runs to completion -
776 # so let's avoid a race condition,
777 # and wait a little till it's done.
778 # Then clean it up - and then be gone.
779 deadline = time.time() + 300
780 while cls.vapi.cli('show packet-generator').find("Yes") != -1:
781 cls.sleep(0.01) # yield
782 if time.time() > deadline:
783 cls.logger.error("Timeout waiting for pg to stop")
785 for stamp, cap_name in cls._captures:
786 cls.vapi.cli('packet-generator delete %s' % cap_name)
790 def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
792 Create packet-generator interfaces.
794 :param interfaces: iterable indexes of the interfaces.
795 :returns: List of created interfaces.
800 intf = VppPGInterface(cls, i, gso, gso_size)
801 setattr(cls, intf.name, intf)
803 cls.pg_interfaces = result
807 def create_loopback_interfaces(cls, count):
809 Create loopback interfaces.
811 :param count: number of interfaces created.
812 :returns: List of created interfaces.
814 result = [VppLoInterface(cls) for i in range(count)]
816 setattr(cls, intf.name, intf)
817 cls.lo_interfaces = result
821 def create_bvi_interfaces(cls, count):
823 Create BVI interfaces.
825 :param count: number of interfaces created.
826 :returns: List of created interfaces.
828 result = [VppBviInterface(cls) for i in range(count)]
830 setattr(cls, intf.name, intf)
831 cls.bvi_interfaces = result
835 def extend_packet(packet, size, padding=' '):
837 Extend packet to given size by padding with spaces or custom padding
838 NOTE: Currently works only when Raw layer is present.
840 :param packet: packet
841 :param size: target size
842 :param padding: padding used to extend the payload
845 packet_len = len(packet) + 4
846 extend = size - packet_len
848 num = (extend // len(padding)) + 1
849 packet[Raw].load += (padding * num)[:extend].encode("ascii")
852 def reset_packet_infos(cls):
853 """ Reset the list of packet info objects and packet counts to zero """
854 cls._packet_infos = {}
855 cls._packet_count_for_dst_if_idx = {}
858 def create_packet_info(cls, src_if, dst_if):
860 Create packet info object containing the source and destination indexes
861 and add it to the testcase's packet info list
863 :param VppInterface src_if: source interface
864 :param VppInterface dst_if: destination interface
866 :returns: _PacketInfo object
870 info.index = len(cls._packet_infos)
871 info.src = src_if.sw_if_index
872 info.dst = dst_if.sw_if_index
873 if isinstance(dst_if, VppSubInterface):
874 dst_idx = dst_if.parent.sw_if_index
876 dst_idx = dst_if.sw_if_index
877 if dst_idx in cls._packet_count_for_dst_if_idx:
878 cls._packet_count_for_dst_if_idx[dst_idx] += 1
880 cls._packet_count_for_dst_if_idx[dst_idx] = 1
881 cls._packet_infos[info.index] = info
885 def info_to_payload(info):
887 Convert _PacketInfo object to packet payload
889 :param info: _PacketInfo object
891 :returns: string containing serialized data from packet info
893 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
897 def payload_to_info(payload, payload_field='load'):
899 Convert packet payload to _PacketInfo object
901 :param payload: packet payload
902 :type payload: <class 'scapy.packet.Raw'>
903 :param payload_field: packet fieldname of payload "load" for
904 <class 'scapy.packet.Raw'>
905 :type payload_field: str
906 :returns: _PacketInfo object containing de-serialized data from payload
909 numbers = getattr(payload, payload_field).split()
911 info.index = int(numbers[0])
912 info.src = int(numbers[1])
913 info.dst = int(numbers[2])
914 info.ip = int(numbers[3])
915 info.proto = int(numbers[4])
918 def get_next_packet_info(self, info):
920 Iterate over the packet info list stored in the testcase
921 Start iteration with first element if info is None
922 Continue based on index in info if info is specified
924 :param info: info or None
925 :returns: next info in list or None if no more infos
930 next_index = info.index + 1
931 if next_index == len(self._packet_infos):
934 return self._packet_infos[next_index]
936 def get_next_packet_info_for_interface(self, src_index, info):
938 Search the packet info list for the next packet info with same source
941 :param src_index: source interface index to search for
942 :param info: packet info - where to start the search
943 :returns: packet info or None
947 info = self.get_next_packet_info(info)
950 if info.src == src_index:
953 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
955 Search the packet info list for the next packet info with same source
956 and destination interface indexes
958 :param src_index: source interface index to search for
959 :param dst_index: destination interface index to search for
960 :param info: packet info - where to start the search
961 :returns: packet info or None
965 info = self.get_next_packet_info_for_interface(src_index, info)
968 if info.dst == dst_index:
971 def assert_equal(self, real_value, expected_value, name_or_class=None):
972 if name_or_class is None:
973 self.assertEqual(real_value, expected_value)
976 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
977 msg = msg % (getdoc(name_or_class).strip(),
978 real_value, str(name_or_class(real_value)),
979 expected_value, str(name_or_class(expected_value)))
981 msg = "Invalid %s: %s does not match expected value %s" % (
982 name_or_class, real_value, expected_value)
984 self.assertEqual(real_value, expected_value, msg)
986 def assert_in_range(self,
994 msg = "Invalid %s: %s out of range <%s,%s>" % (
995 name, real_value, expected_min, expected_max)
996 self.assertTrue(expected_min <= real_value <= expected_max, msg)
998 def assert_packet_checksums_valid(self, packet,
999 ignore_zero_udp_checksums=True):
1000 received = packet.__class__(scapy.compat.raw(packet))
1001 udp_layers = ['UDP', 'UDPerror']
1002 checksum_fields = ['cksum', 'chksum']
1005 temp = received.__class__(scapy.compat.raw(received))
1007 layer = temp.getlayer(counter)
1009 for cf in checksum_fields:
1010 if hasattr(layer, cf):
1011 if ignore_zero_udp_checksums and \
1012 0 == getattr(layer, cf) and \
1013 layer.name in udp_layers:
1016 checksums.append((counter, cf))
1019 counter = counter + 1
1020 if 0 == len(checksums):
1022 temp = temp.__class__(scapy.compat.raw(temp))
1023 for layer, cf in checksums:
1024 calc_sum = getattr(temp[layer], cf)
1026 getattr(received[layer], cf), calc_sum,
1027 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
1029 "Checksum field `%s` on `%s` layer has correct value `%s`" %
1030 (cf, temp[layer].name, calc_sum))
1032 def assert_checksum_valid(self, received_packet, layer,
1033 field_name='chksum',
1034 ignore_zero_checksum=False):
1035 """ Check checksum of received packet on given layer """
1036 received_packet_checksum = getattr(received_packet[layer], field_name)
1037 if ignore_zero_checksum and 0 == received_packet_checksum:
1039 recalculated = received_packet.__class__(
1040 scapy.compat.raw(received_packet))
1041 delattr(recalculated[layer], field_name)
1042 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1043 self.assert_equal(received_packet_checksum,
1044 getattr(recalculated[layer], field_name),
1045 "packet checksum on layer: %s" % layer)
1047 def assert_ip_checksum_valid(self, received_packet,
1048 ignore_zero_checksum=False):
1049 self.assert_checksum_valid(received_packet, 'IP',
1050 ignore_zero_checksum=ignore_zero_checksum)
1052 def assert_tcp_checksum_valid(self, received_packet,
1053 ignore_zero_checksum=False):
1054 self.assert_checksum_valid(received_packet, 'TCP',
1055 ignore_zero_checksum=ignore_zero_checksum)
1057 def assert_udp_checksum_valid(self, received_packet,
1058 ignore_zero_checksum=True):
1059 self.assert_checksum_valid(received_packet, 'UDP',
1060 ignore_zero_checksum=ignore_zero_checksum)
1062 def assert_embedded_icmp_checksum_valid(self, received_packet):
1063 if received_packet.haslayer(IPerror):
1064 self.assert_checksum_valid(received_packet, 'IPerror')
1065 if received_packet.haslayer(TCPerror):
1066 self.assert_checksum_valid(received_packet, 'TCPerror')
1067 if received_packet.haslayer(UDPerror):
1068 self.assert_checksum_valid(received_packet, 'UDPerror',
1069 ignore_zero_checksum=True)
1070 if received_packet.haslayer(ICMPerror):
1071 self.assert_checksum_valid(received_packet, 'ICMPerror')
1073 def assert_icmp_checksum_valid(self, received_packet):
1074 self.assert_checksum_valid(received_packet, 'ICMP')
1075 self.assert_embedded_icmp_checksum_valid(received_packet)
1077 def assert_icmpv6_checksum_valid(self, pkt):
1078 if pkt.haslayer(ICMPv6DestUnreach):
1079 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
1080 self.assert_embedded_icmp_checksum_valid(pkt)
1081 if pkt.haslayer(ICMPv6EchoRequest):
1082 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
1083 if pkt.haslayer(ICMPv6EchoReply):
1084 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
1086 def get_packet_counter(self, counter):
1087 if counter.startswith("/"):
1088 counter_value = self.statistics.get_counter(counter)
1090 counters = self.vapi.cli("sh errors").split('\n')
1092 for i in range(1, len(counters) - 1):
1093 results = counters[i].split()
1094 if results[1] == counter:
1095 counter_value = int(results[0])
1097 return counter_value
1099 def assert_packet_counter_equal(self, counter, expected_value):
1100 counter_value = self.get_packet_counter(counter)
1101 self.assert_equal(counter_value, expected_value,
1102 "packet counter `%s'" % counter)
1104 def assert_error_counter_equal(self, counter, expected_value):
1105 counter_value = self.statistics.get_err_counter(counter)
1106 self.assert_equal(counter_value, expected_value,
1107 "error counter `%s'" % counter)
1110 def sleep(cls, timeout, remark=None):
1112 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1113 # * by Guido, only the main thread can be interrupted.
1115 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1118 if hasattr(os, 'sched_yield'):
1124 if hasattr(cls, 'logger'):
1125 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1126 before = time.time()
1129 if hasattr(cls, 'logger') and after - before > 2 * timeout:
1130 cls.logger.error("unexpected self.sleep() result - "
1131 "slept for %es instead of ~%es!",
1132 after - before, timeout)
1133 if hasattr(cls, 'logger'):
1135 "Finished sleep (%s) - slept %es (wanted %es)",
1136 remark, after - before, timeout)
1138 def pg_send(self, intf, pkts):
1139 self.vapi.cli("clear trace")
1140 intf.add_stream(pkts)
1141 self.pg_enable_capture(self.pg_interfaces)
1144 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
1145 self.pg_send(intf, pkts)
1148 for i in self.pg_interfaces:
1149 i.get_capture(0, timeout=timeout)
1150 i.assert_nothing_captured(remark=remark)
1153 def send_and_expect(self, intf, pkts, output, n_rx=None):
1156 self.pg_send(intf, pkts)
1157 rx = output.get_capture(n_rx)
1160 def send_and_expect_only(self, intf, pkts, output, timeout=None):
1161 self.pg_send(intf, pkts)
1162 rx = output.get_capture(len(pkts))
1166 for i in self.pg_interfaces:
1167 if i not in outputs:
1168 i.get_capture(0, timeout=timeout)
1169 i.assert_nothing_captured()
1175 """ unittest calls runTest when TestCase is instantiated without a
1176 test case. Use case: Writing unittests against VppTestCase"""
1180 def get_testcase_doc_name(test):
1181 return getdoc(test.__class__).splitlines()[0]
1184 def get_test_description(descriptions, test):
1185 short_description = test.shortDescription()
1186 if descriptions and short_description:
1187 return short_description
1192 class TestCaseInfo(object):
1193 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1194 self.logger = logger
1195 self.tempdir = tempdir
1196 self.vpp_pid = vpp_pid
1197 self.vpp_bin_path = vpp_bin_path
1198 self.core_crash_test = None
1201 class VppTestResult(unittest.TestResult):
1203 @property result_string
1204 String variable to store the test case result string.
1206 List variable containing 2-tuples of TestCase instances and strings
1207 holding formatted tracebacks. Each tuple represents a test which
1208 raised an unexpected exception.
1210 List variable containing 2-tuples of TestCase instances and strings
1211 holding formatted tracebacks. Each tuple represents a test where
1212 a failure was explicitly signalled using the TestCase.assert*()
1216 failed_test_cases_info = set()
1217 core_crash_test_cases_info = set()
1218 current_test_case_info = None
1220 def __init__(self, stream=None, descriptions=None, verbosity=None,
1223 :param stream File descriptor to store where to report test results.
1224 Set to the standard error stream by default.
1225 :param descriptions Boolean variable to store information if to use
1226 test case descriptions.
1227 :param verbosity Integer variable to store required verbosity level.
1229 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1230 self.stream = stream
1231 self.descriptions = descriptions
1232 self.verbosity = verbosity
1233 self.result_string = None
1234 self.runner = runner
1236 def addSuccess(self, test):
1238 Record a test succeeded result
1243 if self.current_test_case_info:
1244 self.current_test_case_info.logger.debug(
1245 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1246 test._testMethodName,
1247 test._testMethodDoc))
1248 unittest.TestResult.addSuccess(self, test)
1249 self.result_string = colorize("OK", GREEN)
1251 self.send_result_through_pipe(test, PASS)
1253 def addSkip(self, test, reason):
1255 Record a test skipped.
1261 if self.current_test_case_info:
1262 self.current_test_case_info.logger.debug(
1263 "--- addSkip() %s.%s(%s) called, reason is %s" %
1264 (test.__class__.__name__, test._testMethodName,
1265 test._testMethodDoc, reason))
1266 unittest.TestResult.addSkip(self, test, reason)
1267 self.result_string = colorize("SKIP", YELLOW)
1269 self.send_result_through_pipe(test, SKIP)
1271 def symlink_failed(self):
1272 if self.current_test_case_info:
1274 failed_dir = os.getenv('FAILED_DIR')
1275 link_path = os.path.join(
1278 os.path.basename(self.current_test_case_info.tempdir))
1279 if self.current_test_case_info.logger:
1280 self.current_test_case_info.logger.debug(
1281 "creating a link to the failed test")
1282 self.current_test_case_info.logger.debug(
1283 "os.symlink(%s, %s)" %
1284 (self.current_test_case_info.tempdir, link_path))
1285 if os.path.exists(link_path):
1286 if self.current_test_case_info.logger:
1287 self.current_test_case_info.logger.debug(
1288 'symlink already exists')
1290 os.symlink(self.current_test_case_info.tempdir, link_path)
1292 except Exception as e:
1293 if self.current_test_case_info.logger:
1294 self.current_test_case_info.logger.error(e)
1296 def send_result_through_pipe(self, test, result):
1297 if hasattr(self, 'test_framework_result_pipe'):
1298 pipe = self.test_framework_result_pipe
1300 pipe.send((test.id(), result))
1302 def log_error(self, test, err, fn_name):
1303 if self.current_test_case_info:
1304 if isinstance(test, unittest.suite._ErrorHolder):
1305 test_name = test.description
1307 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1308 test._testMethodName,
1309 test._testMethodDoc)
1310 self.current_test_case_info.logger.debug(
1311 "--- %s() %s called, err is %s" %
1312 (fn_name, test_name, err))
1313 self.current_test_case_info.logger.debug(
1314 "formatted exception is:\n%s" %
1315 "".join(format_exception(*err)))
1317 def add_error(self, test, err, unittest_fn, error_type):
1318 if error_type == FAIL:
1319 self.log_error(test, err, 'addFailure')
1320 error_type_str = colorize("FAIL", RED)
1321 elif error_type == ERROR:
1322 self.log_error(test, err, 'addError')
1323 error_type_str = colorize("ERROR", RED)
1325 raise Exception('Error type %s cannot be used to record an '
1326 'error or a failure' % error_type)
1328 unittest_fn(self, test, err)
1329 if self.current_test_case_info:
1330 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1332 self.current_test_case_info.tempdir)
1333 self.symlink_failed()
1334 self.failed_test_cases_info.add(self.current_test_case_info)
1335 if is_core_present(self.current_test_case_info.tempdir):
1336 if not self.current_test_case_info.core_crash_test:
1337 if isinstance(test, unittest.suite._ErrorHolder):
1338 test_name = str(test)
1340 test_name = "'{!s}' ({!s})".format(
1341 get_testcase_doc_name(test), test.id())
1342 self.current_test_case_info.core_crash_test = test_name
1343 self.core_crash_test_cases_info.add(
1344 self.current_test_case_info)
1346 self.result_string = '%s [no temp dir]' % error_type_str
1348 self.send_result_through_pipe(test, error_type)
1350 def addFailure(self, test, err):
1352 Record a test failed result
1355 :param err: error message
1358 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1360 def addError(self, test, err):
1362 Record a test error result
1365 :param err: error message
1368 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1370 def getDescription(self, test):
1372 Get test description
1375 :returns: test description
1378 return get_test_description(self.descriptions, test)
1380 def startTest(self, test):
1388 def print_header(test):
1389 if not hasattr(test.__class__, '_header_printed'):
1390 print(double_line_delim)
1391 print(colorize(getdoc(test).splitlines()[0], GREEN))
1392 print(double_line_delim)
1393 test.__class__._header_printed = True
1397 unittest.TestResult.startTest(self, test)
1398 if self.verbosity > 0:
1399 self.stream.writeln(
1400 "Starting " + self.getDescription(test) + " ...")
1401 self.stream.writeln(single_line_delim)
1403 def stopTest(self, test):
1405 Called when the given test has been run
1410 unittest.TestResult.stopTest(self, test)
1411 if self.verbosity > 0:
1412 self.stream.writeln(single_line_delim)
1413 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1414 self.result_string))
1415 self.stream.writeln(single_line_delim)
1417 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1418 self.result_string))
1420 self.send_result_through_pipe(test, TEST_RUN)
1422 def printErrors(self):
1424 Print errors from running the test case
1426 if len(self.errors) > 0 or len(self.failures) > 0:
1427 self.stream.writeln()
1428 self.printErrorList('ERROR', self.errors)
1429 self.printErrorList('FAIL', self.failures)
1431 # ^^ that is the last output from unittest before summary
1432 if not self.runner.print_summary:
1433 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1434 self.stream = devnull
1435 self.runner.stream = devnull
1437 def printErrorList(self, flavour, errors):
1439 Print error list to the output stream together with error type
1440 and test case description.
1442 :param flavour: error type
1443 :param errors: iterable errors
1446 for test, err in errors:
1447 self.stream.writeln(double_line_delim)
1448 self.stream.writeln("%s: %s" %
1449 (flavour, self.getDescription(test)))
1450 self.stream.writeln(single_line_delim)
1451 self.stream.writeln("%s" % err)
1454 class VppTestRunner(unittest.TextTestRunner):
1456 A basic test runner implementation which prints results to standard error.
1460 def resultclass(self):
1461 """Class maintaining the results of the tests"""
1462 return VppTestResult
1464 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1465 result_pipe=None, failfast=False, buffer=False,
1466 resultclass=None, print_summary=True, **kwargs):
1467 # ignore stream setting here, use hard-coded stdout to be in sync
1468 # with prints from VppTestCase methods ...
1469 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1470 verbosity, failfast, buffer,
1471 resultclass, **kwargs)
1472 KeepAliveReporter.pipe = keep_alive_pipe
1474 self.orig_stream = self.stream
1475 self.resultclass.test_framework_result_pipe = result_pipe
1477 self.print_summary = print_summary
1479 def _makeResult(self):
1480 return self.resultclass(self.stream,
1485 def run(self, test):
1492 faulthandler.enable() # emit stack trace to stderr if killed by signal
1494 result = super(VppTestRunner, self).run(test)
1495 if not self.print_summary:
1496 self.stream = self.orig_stream
1497 result.stream = self.orig_stream
1501 class Worker(Thread):
1502 def __init__(self, args, logger, env=None):
1503 self.logger = logger
1505 if hasattr(self, 'testcase') and self.testcase.debug_all:
1506 if self.testcase.debug_gdbserver:
1507 self.args = ['/usr/bin/gdbserver', 'localhost:{port}'
1508 .format(port=self.testcase.gdbserver_port)] + args
1509 elif self.testcase.debug_gdb and hasattr(self, 'wait_for_gdb'):
1510 self.args.append(self.wait_for_gdb)
1511 self.app_bin = args[0]
1512 self.app_name = os.path.basename(self.app_bin)
1513 if hasattr(self, 'role'):
1514 self.app_name += ' {role}'.format(role=self.role)
1517 env = {} if env is None else env
1518 self.env = copy.deepcopy(env)
1519 super(Worker, self).__init__()
1521 def wait_for_enter(self):
1522 if not hasattr(self, 'testcase'):
1524 if self.testcase.debug_all and self.testcase.debug_gdbserver:
1526 print(double_line_delim)
1527 print("Spawned GDB Server for '{app}' with PID: {pid}"
1528 .format(app=self.app_name, pid=self.process.pid))
1529 elif self.testcase.debug_all and self.testcase.debug_gdb:
1531 print(double_line_delim)
1532 print("Spawned '{app}' with PID: {pid}"
1533 .format(app=self.app_name, pid=self.process.pid))
1536 print(single_line_delim)
1537 print("You can debug '{app}' using:".format(app=self.app_name))
1538 if self.testcase.debug_gdbserver:
1539 print("sudo gdb " + self.app_bin +
1540 " -ex 'target remote localhost:{port}'"
1541 .format(port=self.testcase.gdbserver_port))
1542 print("Now is the time to attach gdb by running the above "
1543 "command, set up breakpoints etc., then resume from "
1544 "within gdb by issuing the 'continue' command")
1545 self.testcase.gdbserver_port += 1
1546 elif self.testcase.debug_gdb:
1547 print("sudo gdb " + self.app_bin +
1548 " -ex 'attach {pid}'".format(pid=self.process.pid))
1549 print("Now is the time to attach gdb by running the above "
1550 "command and set up breakpoints etc., then resume from"
1551 " within gdb by issuing the 'continue' command")
1552 print(single_line_delim)
1553 input("Press ENTER to continue running the testcase...")
1556 executable = self.args[0]
1557 if not os.path.exists(executable) or not os.access(
1558 executable, os.F_OK | os.X_OK):
1559 # Exit code that means some system file did not exist,
1560 # could not be opened, or had some other kind of error.
1561 self.result = os.EX_OSFILE
1562 raise EnvironmentError(
1563 "executable '%s' is not found or executable." % executable)
1564 self.logger.debug("Running executable: '{app}'"
1565 .format(app=' '.join(self.args)))
1566 env = os.environ.copy()
1567 env.update(self.env)
1568 env["CK_LOG_FILE_NAME"] = "-"
1569 self.process = subprocess.Popen(
1570 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1571 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1572 self.wait_for_enter()
1573 out, err = self.process.communicate()
1574 self.logger.debug("Finished running `{app}'".format(app=self.app_name))
1575 self.logger.info("Return code is `%s'" % self.process.returncode)
1576 self.logger.info(single_line_delim)
1577 self.logger.info("Executable `{app}' wrote to stdout:"
1578 .format(app=self.app_name))
1579 self.logger.info(single_line_delim)
1580 self.logger.info(out.decode('utf-8'))
1581 self.logger.info(single_line_delim)
1582 self.logger.info("Executable `{app}' wrote to stderr:"
1583 .format(app=self.app_name))
1584 self.logger.info(single_line_delim)
1585 self.logger.info(err.decode('utf-8'))
1586 self.logger.info(single_line_delim)
1587 self.result = self.process.returncode
1590 if __name__ == '__main__':