3 from __future__ import print_function
17 from collections import deque
18 from threading import Thread, Event
19 from inspect import getdoc, isclass
20 from traceback import format_exception
21 from logging import FileHandler, DEBUG, Formatter
24 from scapy.packet import Raw
25 import hook as hookmodule
26 from vpp_pg_interface import VppPGInterface
27 from vpp_sub_interface import VppSubInterface
28 from vpp_lo_interface import VppLoInterface
29 from vpp_bvi_interface import VppBviInterface
30 from vpp_papi_provider import VppPapiProvider
31 from vpp_papi.vpp_stats import VPPStats
32 from vpp_papi.vpp_transport_shmem import VppTransportShmemIOError
33 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
35 from vpp_object import VppObjectRegistry
36 from util import ppp, is_core_present
37 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
38 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
39 from scapy.layers.inet6 import ICMPv6EchoReply
41 if os.name == 'posix' and sys.version_info[0] < 3:
42 # using subprocess32 is recommended by python official documentation
43 # @ https://docs.python.org/2/library/subprocess.html
44 import subprocess32 as subprocess
48 # Python2/3 compatible
61 class BoolEnvironmentVariable(object):
63 def __init__(self, env_var_name, default='n', true_values=None):
64 self.name = env_var_name
65 self.default = default
66 self.true_values = true_values if true_values is not None else \
70 return os.getenv(self.name, self.default).lower() in self.true_values
72 if sys.version_info[0] == 2:
73 __nonzero__ = __bool__
76 return 'BoolEnvironmentVariable(%r, default=%r, true_values=%r)' % \
77 (self.name, self.default, self.true_values)
80 debug_framework = BoolEnvironmentVariable('TEST_DEBUG')
85 Test framework module.
87 The module provides a set of tools for constructing and running tests and
88 representing the results.
92 class VppDiedError(Exception):
93 """ exception for reporting that the subprocess has died."""
95 signals_by_value = {v: k for k, v in signal.__dict__.items() if
96 k.startswith('SIG') and not k.startswith('SIG_')}
98 def __init__(self, rv=None, testcase=None, method_name=None):
100 self.signal_name = None
101 self.testcase = testcase
102 self.method_name = method_name
105 self.signal_name = VppDiedError.signals_by_value[-rv]
106 except (KeyError, TypeError):
109 if testcase is None and method_name is None:
112 in_msg = 'running %s.%s ' % (testcase, method_name)
114 msg = "VPP subprocess died %sunexpectedly with return code: %d%s." % (
117 ' [%s]' % (self.signal_name if
118 self.signal_name is not None else ''))
119 super(VppDiedError, self).__init__(msg)
122 class _PacketInfo(object):
123 """Private class to create packet info object.
125 Help process information about the next packet.
126 Set variables to default values.
128 #: Store the index of the packet.
130 #: Store the index of the source packet generator interface of the packet.
132 #: Store the index of the destination packet generator interface
135 #: Store expected ip version
137 #: Store expected upper protocol
139 #: Store the copy of the former packet.
142 def __eq__(self, other):
143 index = self.index == other.index
144 src = self.src == other.src
145 dst = self.dst == other.dst
146 data = self.data == other.data
147 return index and src and dst and data
150 def pump_output(testclass):
151 """ pump output from vpp stdout/stderr to proper queues """
154 while not testclass.pump_thread_stop_flag.is_set():
155 readable = select.select([testclass.vpp.stdout.fileno(),
156 testclass.vpp.stderr.fileno(),
157 testclass.pump_thread_wakeup_pipe[0]],
159 if testclass.vpp.stdout.fileno() in readable:
160 read = os.read(testclass.vpp.stdout.fileno(), 102400)
162 split = read.splitlines(True)
163 if len(stdout_fragment) > 0:
164 split[0] = "%s%s" % (stdout_fragment, split[0])
165 if len(split) > 0 and split[-1].endswith("\n"):
169 stdout_fragment = split[-1]
170 testclass.vpp_stdout_deque.extend(split[:limit])
171 if not testclass.cache_vpp_output:
172 for line in split[:limit]:
173 testclass.logger.debug(
174 "VPP STDOUT: %s" % line.rstrip("\n"))
175 if testclass.vpp.stderr.fileno() in readable:
176 read = os.read(testclass.vpp.stderr.fileno(), 102400)
178 split = read.splitlines(True)
179 if len(stderr_fragment) > 0:
180 split[0] = "%s%s" % (stderr_fragment, split[0])
181 if len(split) > 0 and split[-1].endswith(b"\n"):
185 stderr_fragment = split[-1]
186 testclass.vpp_stderr_deque.extend(split[:limit])
187 if not testclass.cache_vpp_output:
188 for line in split[:limit]:
189 testclass.logger.debug(
190 "VPP STDERR: %s" % line.rstrip("\n"))
191 # ignoring the dummy pipe here intentionally - the
192 # flag will take care of properly terminating the loop
195 def _is_skip_aarch64_set():
196 return BoolEnvironmentVariable('SKIP_AARCH64')
199 is_skip_aarch64_set = _is_skip_aarch64_set()
202 def _is_platform_aarch64():
203 return platform.machine() == 'aarch64'
206 is_platform_aarch64 = _is_platform_aarch64()
209 def _running_extended_tests():
210 return BoolEnvironmentVariable("EXTENDED_TESTS")
213 running_extended_tests = _running_extended_tests()
216 def _running_on_centos():
217 os_id = os.getenv("OS_ID", "")
218 return True if "centos" in os_id.lower() else False
221 running_on_centos = _running_on_centos()
224 class KeepAliveReporter(object):
226 Singleton object which reports test start to parent process
231 self.__dict__ = self._shared_state
239 def pipe(self, pipe):
240 if self._pipe is not None:
241 raise Exception("Internal error - pipe should only be set once.")
244 def send_keep_alive(self, test, desc=None):
246 Write current test tmpdir & desc to keep-alive pipe to signal liveness
248 if self.pipe is None:
249 # if not running forked..
253 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
257 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
260 class VppTestCase(unittest.TestCase):
261 """This subclass is a base class for VPP test cases that are implemented as
262 classes. It provides methods to create and run test case.
265 extra_vpp_punt_config = []
266 extra_vpp_plugin_config = []
269 def packet_infos(self):
270 """List of packet infos"""
271 return self._packet_infos
274 def get_packet_count_for_if_idx(cls, dst_if_index):
275 """Get the number of packet info for specified destination if index"""
276 if dst_if_index in cls._packet_count_for_dst_if_idx:
277 return cls._packet_count_for_dst_if_idx[dst_if_index]
283 """Return the instance of this testcase"""
284 return cls.test_instance
287 def set_debug_flags(cls, d):
288 cls.gdbserver_port = 7777
289 cls.debug_core = False
290 cls.debug_gdb = False
291 cls.debug_gdbserver = False
292 cls.debug_all = False
297 cls.debug_core = True
298 elif dl == "gdb" or dl == "gdb-all":
300 elif dl == "gdbserver" or dl == "gdbserver-all":
301 cls.debug_gdbserver = True
303 raise Exception("Unrecognized DEBUG option: '%s'" % d)
304 if dl == "gdb-all" or dl == "gdbserver-all":
308 def get_least_used_cpu():
309 cpu_usage_list = [set(range(psutil.cpu_count()))]
310 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
311 if 'vpp_main' == p.info['name']]
312 for vpp_process in vpp_processes:
313 for cpu_usage_set in cpu_usage_list:
315 cpu_num = vpp_process.cpu_num()
316 if cpu_num in cpu_usage_set:
317 cpu_usage_set_index = cpu_usage_list.index(
319 if cpu_usage_set_index == len(cpu_usage_list) - 1:
320 cpu_usage_list.append({cpu_num})
322 cpu_usage_list[cpu_usage_set_index + 1].add(
324 cpu_usage_set.remove(cpu_num)
326 except psutil.NoSuchProcess:
329 for cpu_usage_set in cpu_usage_list:
330 if len(cpu_usage_set) > 0:
331 min_usage_set = cpu_usage_set
334 return random.choice(tuple(min_usage_set))
337 def setUpConstants(cls):
338 """ Set-up the test case class based on environment variables """
339 cls.step = BoolEnvironmentVariable('STEP')
340 d = os.getenv("DEBUG", None)
341 # inverted case to handle '' == True
342 c = os.getenv("CACHE_OUTPUT", "1")
343 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
344 cls.set_debug_flags(d)
345 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
346 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
347 cls.test_plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
348 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
350 if cls.plugin_path is not None:
351 if cls.extern_plugin_path is not None:
352 plugin_path = "%s:%s" % (
353 cls.plugin_path, cls.extern_plugin_path)
355 plugin_path = cls.plugin_path
356 elif cls.extern_plugin_path is not None:
357 plugin_path = cls.extern_plugin_path
359 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
360 debug_cli = "cli-listen localhost:5002"
362 size = os.getenv("COREDUMP_SIZE")
364 coredump_size = "coredump-size %s" % size
365 if coredump_size is None:
366 coredump_size = "coredump-size unlimited"
368 cpu_core_number = cls.get_least_used_cpu()
369 if not hasattr(cls, "worker_config"):
370 cls.worker_config = ""
372 cls.vpp_cmdline = [cls.vpp_bin, "unix",
373 "{", "nodaemon", debug_cli, "full-coredump",
374 coredump_size, "runtime-dir", cls.tempdir, "}",
375 "api-trace", "{", "on", "}", "api-segment", "{",
376 "prefix", cls.shm_prefix, "}", "cpu", "{",
377 "main-core", str(cpu_core_number),
378 cls.worker_config, "}",
379 "statseg", "{", "socket-name", cls.stats_sock, "}",
380 "socksvr", "{", "socket-name", cls.api_sock, "}",
382 "{", "plugin", "dpdk_plugin.so", "{", "disable",
383 "}", "plugin", "rdma_plugin.so", "{", "disable",
384 "}", "plugin", "unittest_plugin.so", "{", "enable",
385 "}"] + cls.extra_vpp_plugin_config + ["}", ]
386 if cls.extra_vpp_punt_config is not None:
387 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
388 if plugin_path is not None:
389 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
390 if cls.test_plugin_path is not None:
391 cls.vpp_cmdline.extend(["test_plugin_path", cls.test_plugin_path])
393 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
394 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
397 def wait_for_enter(cls):
398 if cls.debug_gdbserver:
399 print(double_line_delim)
400 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
402 print(double_line_delim)
403 print("Spawned VPP with PID: %d" % cls.vpp.pid)
405 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
407 print(single_line_delim)
408 print("You can debug VPP using:")
409 if cls.debug_gdbserver:
410 print("sudo gdb " + cls.vpp_bin +
411 " -ex 'target remote localhost:{port}'"
412 .format(port=cls.gdbserver_port))
413 print("Now is the time to attach gdb by running the above "
414 "command, set up breakpoints etc., then resume VPP from "
415 "within gdb by issuing the 'continue' command")
416 cls.gdbserver_port += 1
418 print("sudo gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
419 print("Now is the time to attach gdb by running the above "
420 "command and set up breakpoints etc., then resume VPP from"
421 " within gdb by issuing the 'continue' command")
422 print(single_line_delim)
423 input("Press ENTER to continue running the testcase...")
427 cmdline = cls.vpp_cmdline
429 if cls.debug_gdbserver:
430 gdbserver = '/usr/bin/gdbserver'
431 if not os.path.isfile(gdbserver) or \
432 not os.access(gdbserver, os.X_OK):
433 raise Exception("gdbserver binary '%s' does not exist or is "
434 "not executable" % gdbserver)
436 cmdline = [gdbserver, 'localhost:{port}'
437 .format(port=cls.gdbserver_port)] + cls.vpp_cmdline
438 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
441 cls.vpp = subprocess.Popen(cmdline,
442 stdout=subprocess.PIPE,
443 stderr=subprocess.PIPE,
445 except subprocess.CalledProcessError as e:
446 cls.logger.critical("Subprocess returned with non-0 return code: ("
450 cls.logger.critical("Subprocess returned with OS error: "
451 "(%s) %s", e.errno, e.strerror)
453 except Exception as e:
454 cls.logger.exception("Subprocess returned unexpected from "
461 def wait_for_stats_socket(cls):
462 deadline = time.time() + 300
464 while time.time() < deadline or \
465 cls.debug_gdb or cls.debug_gdbserver:
466 if os.path.exists(cls.stats_sock):
471 cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
474 def wait_for_coredump(cls):
475 corefile = cls.tempdir + "/core"
476 if os.path.isfile(corefile):
477 cls.logger.error("Waiting for coredump to complete: %s", corefile)
478 curr_size = os.path.getsize(corefile)
479 deadline = time.time() + 60
481 while time.time() < deadline:
484 curr_size = os.path.getsize(corefile)
485 if size == curr_size:
489 cls.logger.error("Timed out waiting for coredump to complete:"
492 cls.logger.error("Coredump complete: %s, size %d",
498 Perform class setup before running the testcase
499 Remove shared memory files, start vpp and connect the vpp-api
501 super(VppTestCase, cls).setUpClass()
502 gc.collect() # run garbage collection first
503 cls.logger = get_logger(cls.__name__)
504 seed = os.environ["RND_SEED"]
506 if hasattr(cls, 'parallel_handler'):
507 cls.logger.addHandler(cls.parallel_handler)
508 cls.logger.propagate = False
510 cls.tempdir = tempfile.mkdtemp(
511 prefix='vpp-unittest-%s-' % cls.__name__)
512 cls.stats_sock = "%s/stats.sock" % cls.tempdir
513 cls.api_sock = "%s/api.sock" % cls.tempdir
514 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
515 cls.file_handler.setFormatter(
516 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
518 cls.file_handler.setLevel(DEBUG)
519 cls.logger.addHandler(cls.file_handler)
520 cls.logger.debug("--- setUpClass() for %s called ---" %
522 cls.shm_prefix = os.path.basename(cls.tempdir)
523 os.chdir(cls.tempdir)
524 cls.logger.info("Temporary dir is %s, shm prefix is %s",
525 cls.tempdir, cls.shm_prefix)
526 cls.logger.debug("Random seed is %s" % seed)
528 cls.reset_packet_infos()
532 cls.registry = VppObjectRegistry()
533 cls.vpp_startup_failed = False
534 cls.reporter = KeepAliveReporter()
535 # need to catch exceptions here because if we raise, then the cleanup
536 # doesn't get called and we might end with a zombie vpp
539 cls.reporter.send_keep_alive(cls, 'setUpClass')
540 VppTestResult.current_test_case_info = TestCaseInfo(
541 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
542 cls.vpp_stdout_deque = deque()
543 cls.vpp_stderr_deque = deque()
544 cls.pump_thread_stop_flag = Event()
545 cls.pump_thread_wakeup_pipe = os.pipe()
546 cls.pump_thread = Thread(target=pump_output, args=(cls,))
547 cls.pump_thread.daemon = True
548 cls.pump_thread.start()
549 if cls.debug_gdb or cls.debug_gdbserver:
553 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
556 hook = hookmodule.StepHook(cls)
558 hook = hookmodule.PollHook(cls)
559 cls.vapi.register_hook(hook)
560 cls.wait_for_stats_socket()
561 cls.statistics = VPPStats(socketname=cls.stats_sock)
565 cls.vpp_startup_failed = True
567 "VPP died shortly after startup, check the"
568 " output to standard error for possible cause")
574 cls.vapi.disconnect()
577 if cls.debug_gdbserver:
578 print(colorize("You're running VPP inside gdbserver but "
579 "VPP-API connection failed, did you forget "
580 "to 'continue' VPP from within gdb?", RED))
582 except Exception as e:
583 cls.logger.debug("Exception connecting to VPP: %s" % e)
591 Disconnect vpp-api, kill vpp and cleanup shared memory files
593 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
595 if cls.vpp.returncode is None:
597 print(double_line_delim)
598 print("VPP or GDB server is still running")
599 print(single_line_delim)
600 input("When done debugging, press ENTER to kill the "
601 "process and finish running the testcase...")
603 # first signal that we want to stop the pump thread, then wake it up
604 if hasattr(cls, 'pump_thread_stop_flag'):
605 cls.pump_thread_stop_flag.set()
606 if hasattr(cls, 'pump_thread_wakeup_pipe'):
607 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
608 if hasattr(cls, 'pump_thread'):
609 cls.logger.debug("Waiting for pump thread to stop")
610 cls.pump_thread.join()
611 if hasattr(cls, 'vpp_stderr_reader_thread'):
612 cls.logger.debug("Waiting for stdderr pump to stop")
613 cls.vpp_stderr_reader_thread.join()
615 if hasattr(cls, 'vpp'):
616 if hasattr(cls, 'vapi'):
617 cls.logger.debug("Disconnecting class vapi client on %s",
619 cls.vapi.disconnect()
620 cls.logger.debug("Deleting class vapi attribute on %s",
624 if cls.vpp.returncode is None:
625 cls.wait_for_coredump()
626 cls.logger.debug("Sending TERM to vpp")
628 cls.logger.debug("Waiting for vpp to die")
629 cls.vpp.communicate()
630 cls.logger.debug("Deleting class vpp attribute on %s",
634 if cls.vpp_startup_failed:
635 stdout_log = cls.logger.info
636 stderr_log = cls.logger.critical
638 stdout_log = cls.logger.info
639 stderr_log = cls.logger.info
641 if hasattr(cls, 'vpp_stdout_deque'):
642 stdout_log(single_line_delim)
643 stdout_log('VPP output to stdout while running %s:', cls.__name__)
644 stdout_log(single_line_delim)
645 vpp_output = "".join(cls.vpp_stdout_deque)
646 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
648 stdout_log('\n%s', vpp_output)
649 stdout_log(single_line_delim)
651 if hasattr(cls, 'vpp_stderr_deque'):
652 stderr_log(single_line_delim)
653 stderr_log('VPP output to stderr while running %s:', cls.__name__)
654 stderr_log(single_line_delim)
655 vpp_output = "".join(cls.vpp_stderr_deque)
656 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
658 stderr_log('\n%s', vpp_output)
659 stderr_log(single_line_delim)
662 def tearDownClass(cls):
663 """ Perform final cleanup after running all tests in this test-case """
664 cls.logger.debug("--- tearDownClass() for %s called ---" %
666 cls.reporter.send_keep_alive(cls, 'tearDownClass')
668 cls.file_handler.close()
669 cls.reset_packet_infos()
671 debug_internal.on_tear_down_class(cls)
673 def show_commands_at_teardown(self):
674 """ Allow subclass specific teardown logging additions."""
675 self.logger.info("--- No test specific show commands provided. ---")
678 """ Show various debug prints after each test """
679 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
680 (self.__class__.__name__, self._testMethodName,
681 self._testMethodDoc))
684 if not self.vpp_dead:
685 self.logger.debug(self.vapi.cli("show trace max 1000"))
686 self.logger.info(self.vapi.ppcli("show interface"))
687 self.logger.info(self.vapi.ppcli("show hardware"))
688 self.logger.info(self.statistics.set_errors_str())
689 self.logger.info(self.vapi.ppcli("show run"))
690 self.logger.info(self.vapi.ppcli("show log"))
691 self.logger.info(self.vapi.ppcli("show bihash"))
692 self.logger.info("Logging testcase specific show commands.")
693 self.show_commands_at_teardown()
694 self.registry.remove_vpp_config(self.logger)
695 # Save/Dump VPP api trace log
696 m = self._testMethodName
697 api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
698 tmp_api_trace = "/tmp/%s" % api_trace
699 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
700 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
701 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
703 os.rename(tmp_api_trace, vpp_api_trace_log)
704 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
706 except VppTransportShmemIOError:
707 self.logger.debug("VppTransportShmemIOError: Vpp dead. "
708 "Cannot log show commands.")
711 self.registry.unregister_all(self.logger)
714 """ Clear trace before running each test"""
715 super(VppTestCase, self).setUp()
716 self.reporter.send_keep_alive(self)
719 raise VppDiedError(rv=None, testcase=self.__class__.__name__,
720 method_name=self._testMethodName)
721 self.sleep(.1, "during setUp")
722 self.vpp_stdout_deque.append(
723 "--- test setUp() for %s.%s(%s) starts here ---\n" %
724 (self.__class__.__name__, self._testMethodName,
725 self._testMethodDoc))
726 self.vpp_stderr_deque.append(
727 "--- test setUp() for %s.%s(%s) starts here ---\n" %
728 (self.__class__.__name__, self._testMethodName,
729 self._testMethodDoc))
730 self.vapi.cli("clear trace")
731 # store the test instance inside the test class - so that objects
732 # holding the class can access instance methods (like assertEqual)
733 type(self).test_instance = self
736 def pg_enable_capture(cls, interfaces=None):
738 Enable capture on packet-generator interfaces
740 :param interfaces: iterable interface indexes (if None,
741 use self.pg_interfaces)
744 if interfaces is None:
745 interfaces = cls.pg_interfaces
750 def register_capture(cls, cap_name):
751 """ Register a capture in the testclass """
752 # add to the list of captures with current timestamp
753 cls._captures.append((time.time(), cap_name))
756 def get_vpp_time(cls):
757 return float(cls.vapi.cli('show clock').replace("Time now ", ""))
760 def sleep_on_vpp_time(cls, sec):
761 """ Sleep according to time in VPP world """
762 # On a busy system with many processes
763 # we might end up with VPP time being slower than real world
764 # So take that into account when waiting for VPP to do something
765 start_time = cls.get_vpp_time()
766 while cls.get_vpp_time() - start_time < sec:
771 """ Enable the PG, wait till it is done, then clean up """
772 cls.vapi.cli("trace add pg-input 1000")
773 cls.vapi.cli('packet-generator enable')
774 # PG, when starts, runs to completion -
775 # so let's avoid a race condition,
776 # and wait a little till it's done.
777 # Then clean it up - and then be gone.
778 deadline = time.time() + 300
779 while cls.vapi.cli('show packet-generator').find("Yes") != -1:
780 cls.sleep(0.01) # yield
781 if time.time() > deadline:
782 cls.logger.error("Timeout waiting for pg to stop")
784 for stamp, cap_name in cls._captures:
785 cls.vapi.cli('packet-generator delete %s' % cap_name)
789 def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
791 Create packet-generator interfaces.
793 :param interfaces: iterable indexes of the interfaces.
794 :returns: List of created interfaces.
799 intf = VppPGInterface(cls, i, gso, gso_size)
800 setattr(cls, intf.name, intf)
802 cls.pg_interfaces = result
806 def create_loopback_interfaces(cls, count):
808 Create loopback interfaces.
810 :param count: number of interfaces created.
811 :returns: List of created interfaces.
813 result = [VppLoInterface(cls) for i in range(count)]
815 setattr(cls, intf.name, intf)
816 cls.lo_interfaces = result
820 def create_bvi_interfaces(cls, count):
822 Create BVI interfaces.
824 :param count: number of interfaces created.
825 :returns: List of created interfaces.
827 result = [VppBviInterface(cls) for i in range(count)]
829 setattr(cls, intf.name, intf)
830 cls.bvi_interfaces = result
834 def extend_packet(packet, size, padding=' '):
836 Extend packet to given size by padding with spaces or custom padding
837 NOTE: Currently works only when Raw layer is present.
839 :param packet: packet
840 :param size: target size
841 :param padding: padding used to extend the payload
844 packet_len = len(packet) + 4
845 extend = size - packet_len
847 num = (extend // len(padding)) + 1
848 packet[Raw].load += (padding * num)[:extend].encode("ascii")
851 def reset_packet_infos(cls):
852 """ Reset the list of packet info objects and packet counts to zero """
853 cls._packet_infos = {}
854 cls._packet_count_for_dst_if_idx = {}
857 def create_packet_info(cls, src_if, dst_if):
859 Create packet info object containing the source and destination indexes
860 and add it to the testcase's packet info list
862 :param VppInterface src_if: source interface
863 :param VppInterface dst_if: destination interface
865 :returns: _PacketInfo object
869 info.index = len(cls._packet_infos)
870 info.src = src_if.sw_if_index
871 info.dst = dst_if.sw_if_index
872 if isinstance(dst_if, VppSubInterface):
873 dst_idx = dst_if.parent.sw_if_index
875 dst_idx = dst_if.sw_if_index
876 if dst_idx in cls._packet_count_for_dst_if_idx:
877 cls._packet_count_for_dst_if_idx[dst_idx] += 1
879 cls._packet_count_for_dst_if_idx[dst_idx] = 1
880 cls._packet_infos[info.index] = info
884 def info_to_payload(info):
886 Convert _PacketInfo object to packet payload
888 :param info: _PacketInfo object
890 :returns: string containing serialized data from packet info
892 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
896 def payload_to_info(payload, payload_field='load'):
898 Convert packet payload to _PacketInfo object
900 :param payload: packet payload
901 :type payload: <class 'scapy.packet.Raw'>
902 :param payload_field: packet fieldname of payload "load" for
903 <class 'scapy.packet.Raw'>
904 :type payload_field: str
905 :returns: _PacketInfo object containing de-serialized data from payload
908 numbers = getattr(payload, payload_field).split()
910 info.index = int(numbers[0])
911 info.src = int(numbers[1])
912 info.dst = int(numbers[2])
913 info.ip = int(numbers[3])
914 info.proto = int(numbers[4])
917 def get_next_packet_info(self, info):
919 Iterate over the packet info list stored in the testcase
920 Start iteration with first element if info is None
921 Continue based on index in info if info is specified
923 :param info: info or None
924 :returns: next info in list or None if no more infos
929 next_index = info.index + 1
930 if next_index == len(self._packet_infos):
933 return self._packet_infos[next_index]
935 def get_next_packet_info_for_interface(self, src_index, info):
937 Search the packet info list for the next packet info with same source
940 :param src_index: source interface index to search for
941 :param info: packet info - where to start the search
942 :returns: packet info or None
946 info = self.get_next_packet_info(info)
949 if info.src == src_index:
952 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
954 Search the packet info list for the next packet info with same source
955 and destination interface indexes
957 :param src_index: source interface index to search for
958 :param dst_index: destination interface index to search for
959 :param info: packet info - where to start the search
960 :returns: packet info or None
964 info = self.get_next_packet_info_for_interface(src_index, info)
967 if info.dst == dst_index:
970 def assert_equal(self, real_value, expected_value, name_or_class=None):
971 if name_or_class is None:
972 self.assertEqual(real_value, expected_value)
975 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
976 msg = msg % (getdoc(name_or_class).strip(),
977 real_value, str(name_or_class(real_value)),
978 expected_value, str(name_or_class(expected_value)))
980 msg = "Invalid %s: %s does not match expected value %s" % (
981 name_or_class, real_value, expected_value)
983 self.assertEqual(real_value, expected_value, msg)
985 def assert_in_range(self,
993 msg = "Invalid %s: %s out of range <%s,%s>" % (
994 name, real_value, expected_min, expected_max)
995 self.assertTrue(expected_min <= real_value <= expected_max, msg)
997 def assert_packet_checksums_valid(self, packet,
998 ignore_zero_udp_checksums=True):
999 received = packet.__class__(scapy.compat.raw(packet))
1000 udp_layers = ['UDP', 'UDPerror']
1001 checksum_fields = ['cksum', 'chksum']
1004 temp = received.__class__(scapy.compat.raw(received))
1006 layer = temp.getlayer(counter)
1008 for cf in checksum_fields:
1009 if hasattr(layer, cf):
1010 if ignore_zero_udp_checksums and \
1011 0 == getattr(layer, cf) and \
1012 layer.name in udp_layers:
1015 checksums.append((counter, cf))
1018 counter = counter + 1
1019 if 0 == len(checksums):
1021 temp = temp.__class__(scapy.compat.raw(temp))
1022 for layer, cf in checksums:
1023 calc_sum = getattr(temp[layer], cf)
1025 getattr(received[layer], cf), calc_sum,
1026 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
1028 "Checksum field `%s` on `%s` layer has correct value `%s`" %
1029 (cf, temp[layer].name, calc_sum))
1031 def assert_checksum_valid(self, received_packet, layer,
1032 field_name='chksum',
1033 ignore_zero_checksum=False):
1034 """ Check checksum of received packet on given layer """
1035 received_packet_checksum = getattr(received_packet[layer], field_name)
1036 if ignore_zero_checksum and 0 == received_packet_checksum:
1038 recalculated = received_packet.__class__(
1039 scapy.compat.raw(received_packet))
1040 delattr(recalculated[layer], field_name)
1041 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
1042 self.assert_equal(received_packet_checksum,
1043 getattr(recalculated[layer], field_name),
1044 "packet checksum on layer: %s" % layer)
1046 def assert_ip_checksum_valid(self, received_packet,
1047 ignore_zero_checksum=False):
1048 self.assert_checksum_valid(received_packet, 'IP',
1049 ignore_zero_checksum=ignore_zero_checksum)
1051 def assert_tcp_checksum_valid(self, received_packet,
1052 ignore_zero_checksum=False):
1053 self.assert_checksum_valid(received_packet, 'TCP',
1054 ignore_zero_checksum=ignore_zero_checksum)
1056 def assert_udp_checksum_valid(self, received_packet,
1057 ignore_zero_checksum=True):
1058 self.assert_checksum_valid(received_packet, 'UDP',
1059 ignore_zero_checksum=ignore_zero_checksum)
1061 def assert_embedded_icmp_checksum_valid(self, received_packet):
1062 if received_packet.haslayer(IPerror):
1063 self.assert_checksum_valid(received_packet, 'IPerror')
1064 if received_packet.haslayer(TCPerror):
1065 self.assert_checksum_valid(received_packet, 'TCPerror')
1066 if received_packet.haslayer(UDPerror):
1067 self.assert_checksum_valid(received_packet, 'UDPerror',
1068 ignore_zero_checksum=True)
1069 if received_packet.haslayer(ICMPerror):
1070 self.assert_checksum_valid(received_packet, 'ICMPerror')
1072 def assert_icmp_checksum_valid(self, received_packet):
1073 self.assert_checksum_valid(received_packet, 'ICMP')
1074 self.assert_embedded_icmp_checksum_valid(received_packet)
1076 def assert_icmpv6_checksum_valid(self, pkt):
1077 if pkt.haslayer(ICMPv6DestUnreach):
1078 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
1079 self.assert_embedded_icmp_checksum_valid(pkt)
1080 if pkt.haslayer(ICMPv6EchoRequest):
1081 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
1082 if pkt.haslayer(ICMPv6EchoReply):
1083 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
1085 def get_packet_counter(self, counter):
1086 if counter.startswith("/"):
1087 counter_value = self.statistics.get_counter(counter)
1089 counters = self.vapi.cli("sh errors").split('\n')
1091 for i in range(1, len(counters) - 1):
1092 results = counters[i].split()
1093 if results[1] == counter:
1094 counter_value = int(results[0])
1096 return counter_value
1098 def assert_packet_counter_equal(self, counter, expected_value):
1099 counter_value = self.get_packet_counter(counter)
1100 self.assert_equal(counter_value, expected_value,
1101 "packet counter `%s'" % counter)
1103 def assert_error_counter_equal(self, counter, expected_value):
1104 counter_value = self.statistics.get_err_counter(counter)
1105 self.assert_equal(counter_value, expected_value,
1106 "error counter `%s'" % counter)
1109 def sleep(cls, timeout, remark=None):
1111 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1112 # * by Guido, only the main thread can be interrupted.
1114 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1117 if hasattr(os, 'sched_yield'):
1123 if hasattr(cls, 'logger'):
1124 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1125 before = time.time()
1128 if hasattr(cls, 'logger') and after - before > 2 * timeout:
1129 cls.logger.error("unexpected self.sleep() result - "
1130 "slept for %es instead of ~%es!",
1131 after - before, timeout)
1132 if hasattr(cls, 'logger'):
1134 "Finished sleep (%s) - slept %es (wanted %es)",
1135 remark, after - before, timeout)
1137 def pg_send(self, intf, pkts):
1138 self.vapi.cli("clear trace")
1139 intf.add_stream(pkts)
1140 self.pg_enable_capture(self.pg_interfaces)
1143 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
1144 self.pg_send(intf, pkts)
1147 for i in self.pg_interfaces:
1148 i.get_capture(0, timeout=timeout)
1149 i.assert_nothing_captured(remark=remark)
1152 def send_and_expect(self, intf, pkts, output, n_rx=None):
1155 self.pg_send(intf, pkts)
1156 rx = output.get_capture(n_rx)
1159 def send_and_expect_only(self, intf, pkts, output, timeout=None):
1160 self.pg_send(intf, pkts)
1161 rx = output.get_capture(len(pkts))
1165 for i in self.pg_interfaces:
1166 if i not in outputs:
1167 i.get_capture(0, timeout=timeout)
1168 i.assert_nothing_captured()
1174 """ unittest calls runTest when TestCase is instantiated without a
1175 test case. Use case: Writing unittests against VppTestCase"""
1179 def get_testcase_doc_name(test):
1180 return getdoc(test.__class__).splitlines()[0]
1183 def get_test_description(descriptions, test):
1184 short_description = test.shortDescription()
1185 if descriptions and short_description:
1186 return short_description
1191 class TestCaseInfo(object):
1192 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1193 self.logger = logger
1194 self.tempdir = tempdir
1195 self.vpp_pid = vpp_pid
1196 self.vpp_bin_path = vpp_bin_path
1197 self.core_crash_test = None
1200 class VppTestResult(unittest.TestResult):
1202 @property result_string
1203 String variable to store the test case result string.
1205 List variable containing 2-tuples of TestCase instances and strings
1206 holding formatted tracebacks. Each tuple represents a test which
1207 raised an unexpected exception.
1209 List variable containing 2-tuples of TestCase instances and strings
1210 holding formatted tracebacks. Each tuple represents a test where
1211 a failure was explicitly signalled using the TestCase.assert*()
1215 failed_test_cases_info = set()
1216 core_crash_test_cases_info = set()
1217 current_test_case_info = None
1219 def __init__(self, stream=None, descriptions=None, verbosity=None,
1222 :param stream File descriptor to store where to report test results.
1223 Set to the standard error stream by default.
1224 :param descriptions Boolean variable to store information if to use
1225 test case descriptions.
1226 :param verbosity Integer variable to store required verbosity level.
1228 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1229 self.stream = stream
1230 self.descriptions = descriptions
1231 self.verbosity = verbosity
1232 self.result_string = None
1233 self.runner = runner
1235 def addSuccess(self, test):
1237 Record a test succeeded result
1242 if self.current_test_case_info:
1243 self.current_test_case_info.logger.debug(
1244 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1245 test._testMethodName,
1246 test._testMethodDoc))
1247 unittest.TestResult.addSuccess(self, test)
1248 self.result_string = colorize("OK", GREEN)
1250 self.send_result_through_pipe(test, PASS)
1252 def addSkip(self, test, reason):
1254 Record a test skipped.
1260 if self.current_test_case_info:
1261 self.current_test_case_info.logger.debug(
1262 "--- addSkip() %s.%s(%s) called, reason is %s" %
1263 (test.__class__.__name__, test._testMethodName,
1264 test._testMethodDoc, reason))
1265 unittest.TestResult.addSkip(self, test, reason)
1266 self.result_string = colorize("SKIP", YELLOW)
1268 self.send_result_through_pipe(test, SKIP)
1270 def symlink_failed(self):
1271 if self.current_test_case_info:
1273 failed_dir = os.getenv('FAILED_DIR')
1274 link_path = os.path.join(
1277 os.path.basename(self.current_test_case_info.tempdir))
1278 if self.current_test_case_info.logger:
1279 self.current_test_case_info.logger.debug(
1280 "creating a link to the failed test")
1281 self.current_test_case_info.logger.debug(
1282 "os.symlink(%s, %s)" %
1283 (self.current_test_case_info.tempdir, link_path))
1284 if os.path.exists(link_path):
1285 if self.current_test_case_info.logger:
1286 self.current_test_case_info.logger.debug(
1287 'symlink already exists')
1289 os.symlink(self.current_test_case_info.tempdir, link_path)
1291 except Exception as e:
1292 if self.current_test_case_info.logger:
1293 self.current_test_case_info.logger.error(e)
1295 def send_result_through_pipe(self, test, result):
1296 if hasattr(self, 'test_framework_result_pipe'):
1297 pipe = self.test_framework_result_pipe
1299 pipe.send((test.id(), result))
1301 def log_error(self, test, err, fn_name):
1302 if self.current_test_case_info:
1303 if isinstance(test, unittest.suite._ErrorHolder):
1304 test_name = test.description
1306 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1307 test._testMethodName,
1308 test._testMethodDoc)
1309 self.current_test_case_info.logger.debug(
1310 "--- %s() %s called, err is %s" %
1311 (fn_name, test_name, err))
1312 self.current_test_case_info.logger.debug(
1313 "formatted exception is:\n%s" %
1314 "".join(format_exception(*err)))
1316 def add_error(self, test, err, unittest_fn, error_type):
1317 if error_type == FAIL:
1318 self.log_error(test, err, 'addFailure')
1319 error_type_str = colorize("FAIL", RED)
1320 elif error_type == ERROR:
1321 self.log_error(test, err, 'addError')
1322 error_type_str = colorize("ERROR", RED)
1324 raise Exception('Error type %s cannot be used to record an '
1325 'error or a failure' % error_type)
1327 unittest_fn(self, test, err)
1328 if self.current_test_case_info:
1329 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1331 self.current_test_case_info.tempdir)
1332 self.symlink_failed()
1333 self.failed_test_cases_info.add(self.current_test_case_info)
1334 if is_core_present(self.current_test_case_info.tempdir):
1335 if not self.current_test_case_info.core_crash_test:
1336 if isinstance(test, unittest.suite._ErrorHolder):
1337 test_name = str(test)
1339 test_name = "'{!s}' ({!s})".format(
1340 get_testcase_doc_name(test), test.id())
1341 self.current_test_case_info.core_crash_test = test_name
1342 self.core_crash_test_cases_info.add(
1343 self.current_test_case_info)
1345 self.result_string = '%s [no temp dir]' % error_type_str
1347 self.send_result_through_pipe(test, error_type)
1349 def addFailure(self, test, err):
1351 Record a test failed result
1354 :param err: error message
1357 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1359 def addError(self, test, err):
1361 Record a test error result
1364 :param err: error message
1367 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1369 def getDescription(self, test):
1371 Get test description
1374 :returns: test description
1377 return get_test_description(self.descriptions, test)
1379 def startTest(self, test):
1387 def print_header(test):
1388 if not hasattr(test.__class__, '_header_printed'):
1389 print(double_line_delim)
1390 print(colorize(getdoc(test).splitlines()[0], GREEN))
1391 print(double_line_delim)
1392 test.__class__._header_printed = True
1396 unittest.TestResult.startTest(self, test)
1397 if self.verbosity > 0:
1398 self.stream.writeln(
1399 "Starting " + self.getDescription(test) + " ...")
1400 self.stream.writeln(single_line_delim)
1402 def stopTest(self, test):
1404 Called when the given test has been run
1409 unittest.TestResult.stopTest(self, test)
1410 if self.verbosity > 0:
1411 self.stream.writeln(single_line_delim)
1412 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1413 self.result_string))
1414 self.stream.writeln(single_line_delim)
1416 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1417 self.result_string))
1419 self.send_result_through_pipe(test, TEST_RUN)
1421 def printErrors(self):
1423 Print errors from running the test case
1425 if len(self.errors) > 0 or len(self.failures) > 0:
1426 self.stream.writeln()
1427 self.printErrorList('ERROR', self.errors)
1428 self.printErrorList('FAIL', self.failures)
1430 # ^^ that is the last output from unittest before summary
1431 if not self.runner.print_summary:
1432 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1433 self.stream = devnull
1434 self.runner.stream = devnull
1436 def printErrorList(self, flavour, errors):
1438 Print error list to the output stream together with error type
1439 and test case description.
1441 :param flavour: error type
1442 :param errors: iterable errors
1445 for test, err in errors:
1446 self.stream.writeln(double_line_delim)
1447 self.stream.writeln("%s: %s" %
1448 (flavour, self.getDescription(test)))
1449 self.stream.writeln(single_line_delim)
1450 self.stream.writeln("%s" % err)
1453 class VppTestRunner(unittest.TextTestRunner):
1455 A basic test runner implementation which prints results to standard error.
1459 def resultclass(self):
1460 """Class maintaining the results of the tests"""
1461 return VppTestResult
1463 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1464 result_pipe=None, failfast=False, buffer=False,
1465 resultclass=None, print_summary=True, **kwargs):
1466 # ignore stream setting here, use hard-coded stdout to be in sync
1467 # with prints from VppTestCase methods ...
1468 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1469 verbosity, failfast, buffer,
1470 resultclass, **kwargs)
1471 KeepAliveReporter.pipe = keep_alive_pipe
1473 self.orig_stream = self.stream
1474 self.resultclass.test_framework_result_pipe = result_pipe
1476 self.print_summary = print_summary
1478 def _makeResult(self):
1479 return self.resultclass(self.stream,
1484 def run(self, test):
1491 faulthandler.enable() # emit stack trace to stderr if killed by signal
1493 result = super(VppTestRunner, self).run(test)
1494 if not self.print_summary:
1495 self.stream = self.orig_stream
1496 result.stream = self.orig_stream
1500 class Worker(Thread):
1501 def __init__(self, args, logger, env=None):
1502 self.logger = logger
1504 if hasattr(self, 'testcase') and self.testcase.debug_all:
1505 if self.testcase.debug_gdbserver:
1506 self.args = ['/usr/bin/gdbserver', 'localhost:{port}'
1507 .format(port=self.testcase.gdbserver_port)] + args
1508 elif self.testcase.debug_gdb and hasattr(self, 'wait_for_gdb'):
1509 self.args.append(self.wait_for_gdb)
1510 self.app_bin = args[0]
1511 self.app_name = os.path.basename(self.app_bin)
1512 if hasattr(self, 'role'):
1513 self.app_name += ' {role}'.format(role=self.role)
1516 env = {} if env is None else env
1517 self.env = copy.deepcopy(env)
1518 super(Worker, self).__init__()
1520 def wait_for_enter(self):
1521 if not hasattr(self, 'testcase'):
1523 if self.testcase.debug_all and self.testcase.debug_gdbserver:
1525 print(double_line_delim)
1526 print("Spawned GDB Server for '{app}' with PID: {pid}"
1527 .format(app=self.app_name, pid=self.process.pid))
1528 elif self.testcase.debug_all and self.testcase.debug_gdb:
1530 print(double_line_delim)
1531 print("Spawned '{app}' with PID: {pid}"
1532 .format(app=self.app_name, pid=self.process.pid))
1535 print(single_line_delim)
1536 print("You can debug '{app}' using:".format(app=self.app_name))
1537 if self.testcase.debug_gdbserver:
1538 print("sudo gdb " + self.app_bin +
1539 " -ex 'target remote localhost:{port}'"
1540 .format(port=self.testcase.gdbserver_port))
1541 print("Now is the time to attach gdb by running the above "
1542 "command, set up breakpoints etc., then resume from "
1543 "within gdb by issuing the 'continue' command")
1544 self.testcase.gdbserver_port += 1
1545 elif self.testcase.debug_gdb:
1546 print("sudo gdb " + self.app_bin +
1547 " -ex 'attach {pid}'".format(pid=self.process.pid))
1548 print("Now is the time to attach gdb by running the above "
1549 "command and set up breakpoints etc., then resume from"
1550 " within gdb by issuing the 'continue' command")
1551 print(single_line_delim)
1552 input("Press ENTER to continue running the testcase...")
1555 executable = self.args[0]
1556 if not os.path.exists(executable) or not os.access(
1557 executable, os.F_OK | os.X_OK):
1558 # Exit code that means some system file did not exist,
1559 # could not be opened, or had some other kind of error.
1560 self.result = os.EX_OSFILE
1561 raise EnvironmentError(
1562 "executable '%s' is not found or executable." % executable)
1563 self.logger.debug("Running executable: '{app}'"
1564 .format(app=' '.join(self.args)))
1565 env = os.environ.copy()
1566 env.update(self.env)
1567 env["CK_LOG_FILE_NAME"] = "-"
1568 self.process = subprocess.Popen(
1569 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1570 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1571 self.wait_for_enter()
1572 out, err = self.process.communicate()
1573 self.logger.debug("Finished running `{app}'".format(app=self.app_name))
1574 self.logger.info("Return code is `%s'" % self.process.returncode)
1575 self.logger.info(single_line_delim)
1576 self.logger.info("Executable `{app}' wrote to stdout:"
1577 .format(app=self.app_name))
1578 self.logger.info(single_line_delim)
1579 self.logger.info(out.decode('utf-8'))
1580 self.logger.info(single_line_delim)
1581 self.logger.info("Executable `{app}' wrote to stderr:"
1582 .format(app=self.app_name))
1583 self.logger.info(single_line_delim)
1584 self.logger.info(err.decode('utf-8'))
1585 self.logger.info(single_line_delim)
1586 self.result = self.process.returncode
1589 if __name__ == '__main__':