3 from __future__ import print_function
16 from collections import deque
17 from threading import Thread, Event
18 from inspect import getdoc, isclass
19 from traceback import format_exception
20 from logging import FileHandler, DEBUG, Formatter
23 from scapy.packet import Raw
24 from hook import StepHook, PollHook, VppDiedError
25 from vpp_pg_interface import VppPGInterface
26 from vpp_sub_interface import VppSubInterface
27 from vpp_lo_interface import VppLoInterface
28 from vpp_bvi_interface import VppBviInterface
29 from vpp_papi_provider import VppPapiProvider
30 from vpp_papi.vpp_stats import VPPStats
31 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
33 from vpp_object import VppObjectRegistry
34 from util import ppp, is_core_present
35 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
36 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
37 from scapy.layers.inet6 import ICMPv6EchoReply
39 if os.name == 'posix' and sys.version_info[0] < 3:
40 # using subprocess32 is recommended by python official documentation
41 # @ https://docs.python.org/2/library/subprocess.html
42 import subprocess32 as subprocess
46 # Python2/3 compatible
58 debug_framework = False
59 if os.getenv('TEST_DEBUG', "0") == "1":
60 debug_framework = True
64 Test framework module.
66 The module provides a set of tools for constructing and running tests and
67 representing the results.
71 class _PacketInfo(object):
72 """Private class to create packet info object.
74 Help process information about the next packet.
75 Set variables to default values.
77 #: Store the index of the packet.
79 #: Store the index of the source packet generator interface of the packet.
81 #: Store the index of the destination packet generator interface
84 #: Store expected ip version
86 #: Store expected upper protocol
88 #: Store the copy of the former packet.
91 def __eq__(self, other):
92 index = self.index == other.index
93 src = self.src == other.src
94 dst = self.dst == other.dst
95 data = self.data == other.data
96 return index and src and dst and data
99 def pump_output(testclass):
100 """ pump output from vpp stdout/stderr to proper queues """
103 while not testclass.pump_thread_stop_flag.is_set():
104 readable = select.select([testclass.vpp.stdout.fileno(),
105 testclass.vpp.stderr.fileno(),
106 testclass.pump_thread_wakeup_pipe[0]],
108 if testclass.vpp.stdout.fileno() in readable:
109 read = os.read(testclass.vpp.stdout.fileno(), 102400)
111 split = read.splitlines(True)
112 if len(stdout_fragment) > 0:
113 split[0] = "%s%s" % (stdout_fragment, split[0])
114 if len(split) > 0 and split[-1].endswith("\n"):
118 stdout_fragment = split[-1]
119 testclass.vpp_stdout_deque.extend(split[:limit])
120 if not testclass.cache_vpp_output:
121 for line in split[:limit]:
122 testclass.logger.debug(
123 "VPP STDOUT: %s" % line.rstrip("\n"))
124 if testclass.vpp.stderr.fileno() in readable:
125 read = os.read(testclass.vpp.stderr.fileno(), 102400)
127 split = read.splitlines(True)
128 if len(stderr_fragment) > 0:
129 split[0] = "%s%s" % (stderr_fragment, split[0])
130 if len(split) > 0 and split[-1].endswith(b"\n"):
134 stderr_fragment = split[-1]
135 testclass.vpp_stderr_deque.extend(split[:limit])
136 if not testclass.cache_vpp_output:
137 for line in split[:limit]:
138 testclass.logger.debug(
139 "VPP STDERR: %s" % line.rstrip("\n"))
140 # ignoring the dummy pipe here intentionally - the
141 # flag will take care of properly terminating the loop
144 def _is_skip_aarch64_set():
145 return os.getenv('SKIP_AARCH64', 'n').lower() in ('yes', 'y', '1')
147 is_skip_aarch64_set = _is_skip_aarch64_set()
150 def _is_platform_aarch64():
151 return platform.machine() == 'aarch64'
153 is_platform_aarch64 = _is_platform_aarch64()
156 def _running_extended_tests():
157 s = os.getenv("EXTENDED_TESTS", "n")
158 return True if s.lower() in ("y", "yes", "1") else False
160 running_extended_tests = _running_extended_tests()
163 def _running_on_centos():
164 os_id = os.getenv("OS_ID", "")
165 return True if "centos" in os_id.lower() else False
167 running_on_centos = _running_on_centos
170 class KeepAliveReporter(object):
172 Singleton object which reports test start to parent process
177 self.__dict__ = self._shared_state
185 def pipe(self, pipe):
186 if self._pipe is not None:
187 raise Exception("Internal error - pipe should only be set once.")
190 def send_keep_alive(self, test, desc=None):
192 Write current test tmpdir & desc to keep-alive pipe to signal liveness
194 if self.pipe is None:
195 # if not running forked..
199 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
203 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
206 class VppTestCase(unittest.TestCase):
207 """This subclass is a base class for VPP test cases that are implemented as
208 classes. It provides methods to create and run test case.
211 extra_vpp_punt_config = []
212 extra_vpp_plugin_config = []
215 def packet_infos(self):
216 """List of packet infos"""
217 return self._packet_infos
220 def get_packet_count_for_if_idx(cls, dst_if_index):
221 """Get the number of packet info for specified destination if index"""
222 if dst_if_index in cls._packet_count_for_dst_if_idx:
223 return cls._packet_count_for_dst_if_idx[dst_if_index]
229 """Return the instance of this testcase"""
230 return cls.test_instance
233 def set_debug_flags(cls, d):
234 cls.debug_core = False
235 cls.debug_gdb = False
236 cls.debug_gdbserver = False
241 cls.debug_core = True
244 elif dl == "gdbserver":
245 cls.debug_gdbserver = True
247 raise Exception("Unrecognized DEBUG option: '%s'" % d)
250 def get_least_used_cpu():
251 cpu_usage_list = [set(range(psutil.cpu_count()))]
252 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
253 if 'vpp_main' == p.info['name']]
254 for vpp_process in vpp_processes:
255 for cpu_usage_set in cpu_usage_list:
257 cpu_num = vpp_process.cpu_num()
258 if cpu_num in cpu_usage_set:
259 cpu_usage_set_index = cpu_usage_list.index(
261 if cpu_usage_set_index == len(cpu_usage_list) - 1:
262 cpu_usage_list.append({cpu_num})
264 cpu_usage_list[cpu_usage_set_index + 1].add(
266 cpu_usage_set.remove(cpu_num)
268 except psutil.NoSuchProcess:
271 for cpu_usage_set in cpu_usage_list:
272 if len(cpu_usage_set) > 0:
273 min_usage_set = cpu_usage_set
276 return random.choice(tuple(min_usage_set))
279 def setUpConstants(cls):
280 """ Set-up the test case class based on environment variables """
281 s = os.getenv("STEP", "n")
282 cls.step = True if s.lower() in ("y", "yes", "1") else False
283 d = os.getenv("DEBUG", None)
284 c = os.getenv("CACHE_OUTPUT", "1")
285 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
286 cls.set_debug_flags(d)
287 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
288 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
289 cls.test_plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
290 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
292 if cls.plugin_path is not None:
293 if cls.extern_plugin_path is not None:
294 plugin_path = "%s:%s" % (
295 cls.plugin_path, cls.extern_plugin_path)
297 plugin_path = cls.plugin_path
298 elif cls.extern_plugin_path is not None:
299 plugin_path = cls.extern_plugin_path
301 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
302 debug_cli = "cli-listen localhost:5002"
304 size = os.getenv("COREDUMP_SIZE")
306 coredump_size = "coredump-size %s" % size
307 if coredump_size is None:
308 coredump_size = "coredump-size unlimited"
310 cpu_core_number = cls.get_least_used_cpu()
312 cls.vpp_cmdline = [cls.vpp_bin, "unix",
313 "{", "nodaemon", debug_cli, "full-coredump",
314 coredump_size, "runtime-dir", cls.tempdir, "}",
315 "api-trace", "{", "on", "}", "api-segment", "{",
316 "prefix", cls.shm_prefix, "}", "cpu", "{",
317 "main-core", str(cpu_core_number), "}",
318 "statseg", "{", "socket-name", cls.stats_sock, "}",
319 "socksvr", "{", "socket-name", cls.api_sock, "}",
321 "{", "plugin", "dpdk_plugin.so", "{", "disable",
322 "}", "plugin", "rdma_plugin.so", "{", "disable",
323 "}", "plugin", "unittest_plugin.so", "{", "enable",
324 "}"] + cls.extra_vpp_plugin_config + ["}", ]
325 if cls.extra_vpp_punt_config is not None:
326 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
327 if plugin_path is not None:
328 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
329 if cls.test_plugin_path is not None:
330 cls.vpp_cmdline.extend(["test_plugin_path", cls.test_plugin_path])
332 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
333 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
336 def wait_for_enter(cls):
337 if cls.debug_gdbserver:
338 print(double_line_delim)
339 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
341 print(double_line_delim)
342 print("Spawned VPP with PID: %d" % cls.vpp.pid)
344 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
346 print(single_line_delim)
347 print("You can debug the VPP using e.g.:")
348 if cls.debug_gdbserver:
349 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
350 print("Now is the time to attach a gdb by running the above "
351 "command, set up breakpoints etc. and then resume VPP from "
352 "within gdb by issuing the 'continue' command")
354 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
355 print("Now is the time to attach a gdb by running the above "
356 "command and set up breakpoints etc.")
357 print(single_line_delim)
358 input("Press ENTER to continue running the testcase...")
362 cmdline = cls.vpp_cmdline
364 if cls.debug_gdbserver:
365 gdbserver = '/usr/bin/gdbserver'
366 if not os.path.isfile(gdbserver) or \
367 not os.access(gdbserver, os.X_OK):
368 raise Exception("gdbserver binary '%s' does not exist or is "
369 "not executable" % gdbserver)
371 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
372 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
375 cls.vpp = subprocess.Popen(cmdline,
376 stdout=subprocess.PIPE,
377 stderr=subprocess.PIPE,
379 except subprocess.CalledProcessError as e:
380 cls.logger.critical("Subprocess returned with non-0 return code: ("
384 cls.logger.critical("Subprocess returned with OS error: "
385 "(%s) %s", e.errno, e.strerror)
387 except Exception as e:
388 cls.logger.exception("Subprocess returned unexpected from "
395 def wait_for_stats_socket(cls):
396 deadline = time.time() + 3
398 while time.time() < deadline or \
399 cls.debug_gdb or cls.debug_gdbserver:
400 if os.path.exists(cls.stats_sock):
405 cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
408 def wait_for_coredump(cls):
409 corefile = cls.tempdir + "/core"
410 if os.path.isfile(corefile):
411 cls.logger.error("Waiting for coredump to complete: %s", corefile)
412 curr_size = os.path.getsize(corefile)
413 deadline = time.time() + 60
415 while time.time() < deadline:
418 curr_size = os.path.getsize(corefile)
419 if size == curr_size:
423 cls.logger.error("Timed out waiting for coredump to complete:"
426 cls.logger.error("Coredump complete: %s, size %d",
432 Perform class setup before running the testcase
433 Remove shared memory files, start vpp and connect the vpp-api
435 super(VppTestCase, cls).setUpClass()
436 gc.collect() # run garbage collection first
438 cls.logger = get_logger(cls.__name__)
439 if hasattr(cls, 'parallel_handler'):
440 cls.logger.addHandler(cls.parallel_handler)
441 cls.logger.propagate = False
443 cls.tempdir = tempfile.mkdtemp(
444 prefix='vpp-unittest-%s-' % cls.__name__)
445 cls.stats_sock = "%s/stats.sock" % cls.tempdir
446 cls.api_sock = "%s/api.sock" % cls.tempdir
447 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
448 cls.file_handler.setFormatter(
449 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
451 cls.file_handler.setLevel(DEBUG)
452 cls.logger.addHandler(cls.file_handler)
453 cls.logger.debug("--- setUpClass() for %s called ---" %
455 cls.shm_prefix = os.path.basename(cls.tempdir)
456 os.chdir(cls.tempdir)
457 cls.logger.info("Temporary dir is %s, shm prefix is %s",
458 cls.tempdir, cls.shm_prefix)
460 cls.reset_packet_infos()
462 cls._zombie_captures = []
465 cls.registry = VppObjectRegistry()
466 cls.vpp_startup_failed = False
467 cls.reporter = KeepAliveReporter()
468 # need to catch exceptions here because if we raise, then the cleanup
469 # doesn't get called and we might end with a zombie vpp
472 cls.reporter.send_keep_alive(cls, 'setUpClass')
473 VppTestResult.current_test_case_info = TestCaseInfo(
474 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
475 cls.vpp_stdout_deque = deque()
476 cls.vpp_stderr_deque = deque()
477 cls.pump_thread_stop_flag = Event()
478 cls.pump_thread_wakeup_pipe = os.pipe()
479 cls.pump_thread = Thread(target=pump_output, args=(cls,))
480 cls.pump_thread.daemon = True
481 cls.pump_thread.start()
482 if cls.debug_gdb or cls.debug_gdbserver:
486 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
492 cls.vapi.register_hook(hook)
493 cls.wait_for_stats_socket()
494 cls.statistics = VPPStats(socketname=cls.stats_sock)
498 cls.vpp_startup_failed = True
500 "VPP died shortly after startup, check the"
501 " output to standard error for possible cause")
507 cls.vapi.disconnect()
510 if cls.debug_gdbserver:
511 print(colorize("You're running VPP inside gdbserver but "
512 "VPP-API connection failed, did you forget "
513 "to 'continue' VPP from within gdb?", RED))
525 Disconnect vpp-api, kill vpp and cleanup shared memory files
527 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
529 if cls.vpp.returncode is None:
530 print(double_line_delim)
531 print("VPP or GDB server is still running")
532 print(single_line_delim)
533 input("When done debugging, press ENTER to kill the "
534 "process and finish running the testcase...")
536 # first signal that we want to stop the pump thread, then wake it up
537 if hasattr(cls, 'pump_thread_stop_flag'):
538 cls.pump_thread_stop_flag.set()
539 if hasattr(cls, 'pump_thread_wakeup_pipe'):
540 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
541 if hasattr(cls, 'pump_thread'):
542 cls.logger.debug("Waiting for pump thread to stop")
543 cls.pump_thread.join()
544 if hasattr(cls, 'vpp_stderr_reader_thread'):
545 cls.logger.debug("Waiting for stdderr pump to stop")
546 cls.vpp_stderr_reader_thread.join()
548 if hasattr(cls, 'vpp'):
549 if hasattr(cls, 'vapi'):
550 cls.logger.debug("Disconnecting class vapi client on %s",
552 cls.vapi.disconnect()
553 cls.logger.debug("Deleting class vapi attribute on %s",
557 if cls.vpp.returncode is None:
558 cls.wait_for_coredump()
559 cls.logger.debug("Sending TERM to vpp")
561 cls.logger.debug("Waiting for vpp to die")
562 cls.vpp.communicate()
563 cls.logger.debug("Deleting class vpp attribute on %s",
567 if cls.vpp_startup_failed:
568 stdout_log = cls.logger.info
569 stderr_log = cls.logger.critical
571 stdout_log = cls.logger.info
572 stderr_log = cls.logger.info
574 if hasattr(cls, 'vpp_stdout_deque'):
575 stdout_log(single_line_delim)
576 stdout_log('VPP output to stdout while running %s:', cls.__name__)
577 stdout_log(single_line_delim)
578 vpp_output = "".join(cls.vpp_stdout_deque)
579 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
581 stdout_log('\n%s', vpp_output)
582 stdout_log(single_line_delim)
584 if hasattr(cls, 'vpp_stderr_deque'):
585 stderr_log(single_line_delim)
586 stderr_log('VPP output to stderr while running %s:', cls.__name__)
587 stderr_log(single_line_delim)
588 vpp_output = "".join(cls.vpp_stderr_deque)
589 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
591 stderr_log('\n%s', vpp_output)
592 stderr_log(single_line_delim)
595 def tearDownClass(cls):
596 """ Perform final cleanup after running all tests in this test-case """
597 cls.logger.debug("--- tearDownClass() for %s called ---" %
599 cls.reporter.send_keep_alive(cls, 'tearDownClass')
601 cls.file_handler.close()
602 cls.reset_packet_infos()
604 debug_internal.on_tear_down_class(cls)
606 def show_commands_at_teardown(self):
607 """ Allow subclass specific teardown logging additions."""
608 self.logger.info("--- No test specific show commands provided. ---")
611 """ Show various debug prints after each test """
612 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
613 (self.__class__.__name__, self._testMethodName,
614 self._testMethodDoc))
615 if not self.vpp_dead:
617 "--- Logging show commands common to all testcases. ---")
618 self.logger.debug(self.vapi.cli("show trace max 1000"))
619 self.logger.info(self.vapi.ppcli("show interface"))
620 self.logger.info(self.vapi.ppcli("show hardware"))
621 self.logger.info(self.statistics.set_errors_str())
622 self.logger.info(self.vapi.ppcli("show run"))
623 self.logger.info(self.vapi.ppcli("show log"))
624 self.logger.info("Logging testcase specific show commands.")
625 self.show_commands_at_teardown()
626 self.registry.remove_vpp_config(self.logger)
627 # Save/Dump VPP api trace log
628 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
629 tmp_api_trace = "/tmp/%s" % api_trace
630 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
631 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
632 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
634 os.rename(tmp_api_trace, vpp_api_trace_log)
635 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
638 self.registry.unregister_all(self.logger)
641 """ Clear trace before running each test"""
642 super(VppTestCase, self).setUp()
643 self.reporter.send_keep_alive(self)
645 raise Exception("VPP is dead when setting up the test")
646 self.sleep(.1, "during setUp")
647 self.vpp_stdout_deque.append(
648 "--- test setUp() for %s.%s(%s) starts here ---\n" %
649 (self.__class__.__name__, self._testMethodName,
650 self._testMethodDoc))
651 self.vpp_stderr_deque.append(
652 "--- test setUp() for %s.%s(%s) starts here ---\n" %
653 (self.__class__.__name__, self._testMethodName,
654 self._testMethodDoc))
655 self.vapi.cli("clear trace")
656 # store the test instance inside the test class - so that objects
657 # holding the class can access instance methods (like assertEqual)
658 type(self).test_instance = self
661 def pg_enable_capture(cls, interfaces=None):
663 Enable capture on packet-generator interfaces
665 :param interfaces: iterable interface indexes (if None,
666 use self.pg_interfaces)
669 if interfaces is None:
670 interfaces = cls.pg_interfaces
675 def register_capture(cls, cap_name):
676 """ Register a capture in the testclass """
677 # add to the list of captures with current timestamp
678 cls._captures.append((time.time(), cap_name))
679 # filter out from zombies
680 cls._zombie_captures = [(stamp, name)
681 for (stamp, name) in cls._zombie_captures
686 """ Remove any zombie captures and enable the packet generator """
687 # how long before capture is allowed to be deleted - otherwise vpp
688 # crashes - 100ms seems enough (this shouldn't be needed at all)
691 for stamp, cap_name in cls._zombie_captures:
692 wait = stamp + capture_ttl - now
694 cls.sleep(wait, "before deleting capture %s" % cap_name)
696 cls.logger.debug("Removing zombie capture %s" % cap_name)
697 cls.vapi.cli('packet-generator delete %s' % cap_name)
699 cls.vapi.cli("trace add pg-input 1000")
700 cls.vapi.cli('packet-generator enable')
701 cls._zombie_captures = cls._captures
705 def create_pg_interfaces(cls, interfaces):
707 Create packet-generator interfaces.
709 :param interfaces: iterable indexes of the interfaces.
710 :returns: List of created interfaces.
715 intf = VppPGInterface(cls, i)
716 setattr(cls, intf.name, intf)
718 cls.pg_interfaces = result
722 def create_loopback_interfaces(cls, count):
724 Create loopback interfaces.
726 :param count: number of interfaces created.
727 :returns: List of created interfaces.
729 result = [VppLoInterface(cls) for i in range(count)]
731 setattr(cls, intf.name, intf)
732 cls.lo_interfaces = result
736 def create_bvi_interfaces(cls, count):
738 Create BVI interfaces.
740 :param count: number of interfaces created.
741 :returns: List of created interfaces.
743 result = [VppBviInterface(cls) for i in range(count)]
745 setattr(cls, intf.name, intf)
746 cls.bvi_interfaces = result
750 def extend_packet(packet, size, padding=' '):
752 Extend packet to given size by padding with spaces or custom padding
753 NOTE: Currently works only when Raw layer is present.
755 :param packet: packet
756 :param size: target size
757 :param padding: padding used to extend the payload
760 packet_len = len(packet) + 4
761 extend = size - packet_len
763 num = (extend // len(padding)) + 1
764 packet[Raw].load += (padding * num)[:extend].encode("ascii")
767 def reset_packet_infos(cls):
768 """ Reset the list of packet info objects and packet counts to zero """
769 cls._packet_infos = {}
770 cls._packet_count_for_dst_if_idx = {}
773 def create_packet_info(cls, src_if, dst_if):
775 Create packet info object containing the source and destination indexes
776 and add it to the testcase's packet info list
778 :param VppInterface src_if: source interface
779 :param VppInterface dst_if: destination interface
781 :returns: _PacketInfo object
785 info.index = len(cls._packet_infos)
786 info.src = src_if.sw_if_index
787 info.dst = dst_if.sw_if_index
788 if isinstance(dst_if, VppSubInterface):
789 dst_idx = dst_if.parent.sw_if_index
791 dst_idx = dst_if.sw_if_index
792 if dst_idx in cls._packet_count_for_dst_if_idx:
793 cls._packet_count_for_dst_if_idx[dst_idx] += 1
795 cls._packet_count_for_dst_if_idx[dst_idx] = 1
796 cls._packet_infos[info.index] = info
800 def info_to_payload(info):
802 Convert _PacketInfo object to packet payload
804 :param info: _PacketInfo object
806 :returns: string containing serialized data from packet info
808 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
812 def payload_to_info(payload, payload_field='load'):
814 Convert packet payload to _PacketInfo object
816 :param payload: packet payload
817 :type payload: <class 'scapy.packet.Raw'>
818 :param payload_field: packet fieldname of payload "load" for
819 <class 'scapy.packet.Raw'>
820 :type payload_field: str
821 :returns: _PacketInfo object containing de-serialized data from payload
824 numbers = getattr(payload, payload_field).split()
826 info.index = int(numbers[0])
827 info.src = int(numbers[1])
828 info.dst = int(numbers[2])
829 info.ip = int(numbers[3])
830 info.proto = int(numbers[4])
833 def get_next_packet_info(self, info):
835 Iterate over the packet info list stored in the testcase
836 Start iteration with first element if info is None
837 Continue based on index in info if info is specified
839 :param info: info or None
840 :returns: next info in list or None if no more infos
845 next_index = info.index + 1
846 if next_index == len(self._packet_infos):
849 return self._packet_infos[next_index]
851 def get_next_packet_info_for_interface(self, src_index, info):
853 Search the packet info list for the next packet info with same source
856 :param src_index: source interface index to search for
857 :param info: packet info - where to start the search
858 :returns: packet info or None
862 info = self.get_next_packet_info(info)
865 if info.src == src_index:
868 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
870 Search the packet info list for the next packet info with same source
871 and destination interface indexes
873 :param src_index: source interface index to search for
874 :param dst_index: destination interface index to search for
875 :param info: packet info - where to start the search
876 :returns: packet info or None
880 info = self.get_next_packet_info_for_interface(src_index, info)
883 if info.dst == dst_index:
886 def assert_equal(self, real_value, expected_value, name_or_class=None):
887 if name_or_class is None:
888 self.assertEqual(real_value, expected_value)
891 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
892 msg = msg % (getdoc(name_or_class).strip(),
893 real_value, str(name_or_class(real_value)),
894 expected_value, str(name_or_class(expected_value)))
896 msg = "Invalid %s: %s does not match expected value %s" % (
897 name_or_class, real_value, expected_value)
899 self.assertEqual(real_value, expected_value, msg)
901 def assert_in_range(self,
909 msg = "Invalid %s: %s out of range <%s,%s>" % (
910 name, real_value, expected_min, expected_max)
911 self.assertTrue(expected_min <= real_value <= expected_max, msg)
913 def assert_packet_checksums_valid(self, packet,
914 ignore_zero_udp_checksums=True):
915 received = packet.__class__(scapy.compat.raw(packet))
917 ppp("Verifying packet checksums for packet:", received))
918 udp_layers = ['UDP', 'UDPerror']
919 checksum_fields = ['cksum', 'chksum']
922 temp = received.__class__(scapy.compat.raw(received))
924 layer = temp.getlayer(counter)
926 for cf in checksum_fields:
927 if hasattr(layer, cf):
928 if ignore_zero_udp_checksums and \
929 0 == getattr(layer, cf) and \
930 layer.name in udp_layers:
933 checksums.append((counter, cf))
936 counter = counter + 1
937 if 0 == len(checksums):
939 temp = temp.__class__(scapy.compat.raw(temp))
940 for layer, cf in checksums:
941 calc_sum = getattr(temp[layer], cf)
943 getattr(received[layer], cf), calc_sum,
944 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
946 "Checksum field `%s` on `%s` layer has correct value `%s`" %
947 (cf, temp[layer].name, calc_sum))
949 def assert_checksum_valid(self, received_packet, layer,
951 ignore_zero_checksum=False):
952 """ Check checksum of received packet on given layer """
953 received_packet_checksum = getattr(received_packet[layer], field_name)
954 if ignore_zero_checksum and 0 == received_packet_checksum:
956 recalculated = received_packet.__class__(
957 scapy.compat.raw(received_packet))
958 delattr(recalculated[layer], field_name)
959 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
960 self.assert_equal(received_packet_checksum,
961 getattr(recalculated[layer], field_name),
962 "packet checksum on layer: %s" % layer)
964 def assert_ip_checksum_valid(self, received_packet,
965 ignore_zero_checksum=False):
966 self.assert_checksum_valid(received_packet, 'IP',
967 ignore_zero_checksum=ignore_zero_checksum)
969 def assert_tcp_checksum_valid(self, received_packet,
970 ignore_zero_checksum=False):
971 self.assert_checksum_valid(received_packet, 'TCP',
972 ignore_zero_checksum=ignore_zero_checksum)
974 def assert_udp_checksum_valid(self, received_packet,
975 ignore_zero_checksum=True):
976 self.assert_checksum_valid(received_packet, 'UDP',
977 ignore_zero_checksum=ignore_zero_checksum)
979 def assert_embedded_icmp_checksum_valid(self, received_packet):
980 if received_packet.haslayer(IPerror):
981 self.assert_checksum_valid(received_packet, 'IPerror')
982 if received_packet.haslayer(TCPerror):
983 self.assert_checksum_valid(received_packet, 'TCPerror')
984 if received_packet.haslayer(UDPerror):
985 self.assert_checksum_valid(received_packet, 'UDPerror',
986 ignore_zero_checksum=True)
987 if received_packet.haslayer(ICMPerror):
988 self.assert_checksum_valid(received_packet, 'ICMPerror')
990 def assert_icmp_checksum_valid(self, received_packet):
991 self.assert_checksum_valid(received_packet, 'ICMP')
992 self.assert_embedded_icmp_checksum_valid(received_packet)
994 def assert_icmpv6_checksum_valid(self, pkt):
995 if pkt.haslayer(ICMPv6DestUnreach):
996 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
997 self.assert_embedded_icmp_checksum_valid(pkt)
998 if pkt.haslayer(ICMPv6EchoRequest):
999 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
1000 if pkt.haslayer(ICMPv6EchoReply):
1001 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
1003 def assert_packet_counter_equal(self, counter, expected_value):
1004 if counter.startswith("/"):
1005 counter_value = self.statistics.get_counter(counter)
1006 self.assert_equal(counter_value, expected_value,
1007 "packet counter `%s'" % counter)
1009 counters = self.vapi.cli("sh errors").split('\n')
1011 for i in range(1, len(counters) - 1):
1012 results = counters[i].split()
1013 if results[1] == counter:
1014 counter_value = int(results[0])
1018 def sleep(cls, timeout, remark=None):
1020 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1021 # * by Guido, only the main thread can be interrupted.
1023 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1026 if hasattr(os, 'sched_yield'):
1032 if hasattr(cls, 'logger'):
1033 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1034 before = time.time()
1037 if hasattr(cls, 'logger') and after - before > 2 * timeout:
1038 cls.logger.error("unexpected self.sleep() result - "
1039 "slept for %es instead of ~%es!",
1040 after - before, timeout)
1041 if hasattr(cls, 'logger'):
1043 "Finished sleep (%s) - slept %es (wanted %es)",
1044 remark, after - before, timeout)
1046 def pg_send(self, intf, pkts):
1047 self.vapi.cli("clear trace")
1048 intf.add_stream(pkts)
1049 self.pg_enable_capture(self.pg_interfaces)
1052 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
1053 self.pg_send(intf, pkts)
1056 for i in self.pg_interfaces:
1057 i.get_capture(0, timeout=timeout)
1058 i.assert_nothing_captured(remark=remark)
1061 def send_and_expect(self, intf, pkts, output, n_rx=None):
1064 self.pg_send(intf, pkts)
1065 rx = output.get_capture(n_rx)
1068 def send_and_expect_only(self, intf, pkts, output, timeout=None):
1069 self.pg_send(intf, pkts)
1070 rx = output.get_capture(len(pkts))
1074 for i in self.pg_interfaces:
1075 if i not in outputs:
1076 i.get_capture(0, timeout=timeout)
1077 i.assert_nothing_captured()
1083 """ unittest calls runTest when TestCase is instantiated without a
1084 test case. Use case: Writing unittests against VppTestCase"""
1088 def get_testcase_doc_name(test):
1089 return getdoc(test.__class__).splitlines()[0]
1092 def get_test_description(descriptions, test):
1093 short_description = test.shortDescription()
1094 if descriptions and short_description:
1095 return short_description
1100 class TestCaseInfo(object):
1101 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1102 self.logger = logger
1103 self.tempdir = tempdir
1104 self.vpp_pid = vpp_pid
1105 self.vpp_bin_path = vpp_bin_path
1106 self.core_crash_test = None
1109 class VppTestResult(unittest.TestResult):
1111 @property result_string
1112 String variable to store the test case result string.
1114 List variable containing 2-tuples of TestCase instances and strings
1115 holding formatted tracebacks. Each tuple represents a test which
1116 raised an unexpected exception.
1118 List variable containing 2-tuples of TestCase instances and strings
1119 holding formatted tracebacks. Each tuple represents a test where
1120 a failure was explicitly signalled using the TestCase.assert*()
1124 failed_test_cases_info = set()
1125 core_crash_test_cases_info = set()
1126 current_test_case_info = None
1128 def __init__(self, stream=None, descriptions=None, verbosity=None,
1131 :param stream File descriptor to store where to report test results.
1132 Set to the standard error stream by default.
1133 :param descriptions Boolean variable to store information if to use
1134 test case descriptions.
1135 :param verbosity Integer variable to store required verbosity level.
1137 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1138 self.stream = stream
1139 self.descriptions = descriptions
1140 self.verbosity = verbosity
1141 self.result_string = None
1142 self.runner = runner
1144 def addSuccess(self, test):
1146 Record a test succeeded result
1151 if self.current_test_case_info:
1152 self.current_test_case_info.logger.debug(
1153 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1154 test._testMethodName,
1155 test._testMethodDoc))
1156 unittest.TestResult.addSuccess(self, test)
1157 self.result_string = colorize("OK", GREEN)
1159 self.send_result_through_pipe(test, PASS)
1161 def addSkip(self, test, reason):
1163 Record a test skipped.
1169 if self.current_test_case_info:
1170 self.current_test_case_info.logger.debug(
1171 "--- addSkip() %s.%s(%s) called, reason is %s" %
1172 (test.__class__.__name__, test._testMethodName,
1173 test._testMethodDoc, reason))
1174 unittest.TestResult.addSkip(self, test, reason)
1175 self.result_string = colorize("SKIP", YELLOW)
1177 self.send_result_through_pipe(test, SKIP)
1179 def symlink_failed(self):
1180 if self.current_test_case_info:
1182 failed_dir = os.getenv('FAILED_DIR')
1183 link_path = os.path.join(
1186 os.path.basename(self.current_test_case_info.tempdir))
1187 if self.current_test_case_info.logger:
1188 self.current_test_case_info.logger.debug(
1189 "creating a link to the failed test")
1190 self.current_test_case_info.logger.debug(
1191 "os.symlink(%s, %s)" %
1192 (self.current_test_case_info.tempdir, link_path))
1193 if os.path.exists(link_path):
1194 if self.current_test_case_info.logger:
1195 self.current_test_case_info.logger.debug(
1196 'symlink already exists')
1198 os.symlink(self.current_test_case_info.tempdir, link_path)
1200 except Exception as e:
1201 if self.current_test_case_info.logger:
1202 self.current_test_case_info.logger.error(e)
1204 def send_result_through_pipe(self, test, result):
1205 if hasattr(self, 'test_framework_result_pipe'):
1206 pipe = self.test_framework_result_pipe
1208 pipe.send((test.id(), result))
1210 def log_error(self, test, err, fn_name):
1211 if self.current_test_case_info:
1212 if isinstance(test, unittest.suite._ErrorHolder):
1213 test_name = test.description
1215 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1216 test._testMethodName,
1217 test._testMethodDoc)
1218 self.current_test_case_info.logger.debug(
1219 "--- %s() %s called, err is %s" %
1220 (fn_name, test_name, err))
1221 self.current_test_case_info.logger.debug(
1222 "formatted exception is:\n%s" %
1223 "".join(format_exception(*err)))
1225 def add_error(self, test, err, unittest_fn, error_type):
1226 if error_type == FAIL:
1227 self.log_error(test, err, 'addFailure')
1228 error_type_str = colorize("FAIL", RED)
1229 elif error_type == ERROR:
1230 self.log_error(test, err, 'addError')
1231 error_type_str = colorize("ERROR", RED)
1233 raise Exception('Error type %s cannot be used to record an '
1234 'error or a failure' % error_type)
1236 unittest_fn(self, test, err)
1237 if self.current_test_case_info:
1238 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1240 self.current_test_case_info.tempdir)
1241 self.symlink_failed()
1242 self.failed_test_cases_info.add(self.current_test_case_info)
1243 if is_core_present(self.current_test_case_info.tempdir):
1244 if not self.current_test_case_info.core_crash_test:
1245 if isinstance(test, unittest.suite._ErrorHolder):
1246 test_name = str(test)
1248 test_name = "'{!s}' ({!s})".format(
1249 get_testcase_doc_name(test), test.id())
1250 self.current_test_case_info.core_crash_test = test_name
1251 self.core_crash_test_cases_info.add(
1252 self.current_test_case_info)
1254 self.result_string = '%s [no temp dir]' % error_type_str
1256 self.send_result_through_pipe(test, error_type)
1258 def addFailure(self, test, err):
1260 Record a test failed result
1263 :param err: error message
1266 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1268 def addError(self, test, err):
1270 Record a test error result
1273 :param err: error message
1276 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1278 def getDescription(self, test):
1280 Get test description
1283 :returns: test description
1286 return get_test_description(self.descriptions, test)
1288 def startTest(self, test):
1296 def print_header(test):
1297 if not hasattr(test.__class__, '_header_printed'):
1298 print(double_line_delim)
1299 print(colorize(getdoc(test).splitlines()[0], GREEN))
1300 print(double_line_delim)
1301 test.__class__._header_printed = True
1305 unittest.TestResult.startTest(self, test)
1306 if self.verbosity > 0:
1307 self.stream.writeln(
1308 "Starting " + self.getDescription(test) + " ...")
1309 self.stream.writeln(single_line_delim)
1311 def stopTest(self, test):
1313 Called when the given test has been run
1318 unittest.TestResult.stopTest(self, test)
1319 if self.verbosity > 0:
1320 self.stream.writeln(single_line_delim)
1321 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1322 self.result_string))
1323 self.stream.writeln(single_line_delim)
1325 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1326 self.result_string))
1328 self.send_result_through_pipe(test, TEST_RUN)
1330 def printErrors(self):
1332 Print errors from running the test case
1334 if len(self.errors) > 0 or len(self.failures) > 0:
1335 self.stream.writeln()
1336 self.printErrorList('ERROR', self.errors)
1337 self.printErrorList('FAIL', self.failures)
1339 # ^^ that is the last output from unittest before summary
1340 if not self.runner.print_summary:
1341 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1342 self.stream = devnull
1343 self.runner.stream = devnull
1345 def printErrorList(self, flavour, errors):
1347 Print error list to the output stream together with error type
1348 and test case description.
1350 :param flavour: error type
1351 :param errors: iterable errors
1354 for test, err in errors:
1355 self.stream.writeln(double_line_delim)
1356 self.stream.writeln("%s: %s" %
1357 (flavour, self.getDescription(test)))
1358 self.stream.writeln(single_line_delim)
1359 self.stream.writeln("%s" % err)
1362 class VppTestRunner(unittest.TextTestRunner):
1364 A basic test runner implementation which prints results to standard error.
1368 def resultclass(self):
1369 """Class maintaining the results of the tests"""
1370 return VppTestResult
1372 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1373 result_pipe=None, failfast=False, buffer=False,
1374 resultclass=None, print_summary=True, **kwargs):
1375 # ignore stream setting here, use hard-coded stdout to be in sync
1376 # with prints from VppTestCase methods ...
1377 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1378 verbosity, failfast, buffer,
1379 resultclass, **kwargs)
1380 KeepAliveReporter.pipe = keep_alive_pipe
1382 self.orig_stream = self.stream
1383 self.resultclass.test_framework_result_pipe = result_pipe
1385 self.print_summary = print_summary
1387 def _makeResult(self):
1388 return self.resultclass(self.stream,
1393 def run(self, test):
1400 faulthandler.enable() # emit stack trace to stderr if killed by signal
1402 result = super(VppTestRunner, self).run(test)
1403 if not self.print_summary:
1404 self.stream = self.orig_stream
1405 result.stream = self.orig_stream
1409 class Worker(Thread):
1410 def __init__(self, args, logger, env={}):
1411 self.logger = logger
1414 self.env = copy.deepcopy(env)
1415 super(Worker, self).__init__()
1418 executable = self.args[0]
1419 self.logger.debug("Running executable w/args `%s'" % self.args)
1420 env = os.environ.copy()
1421 env.update(self.env)
1422 env["CK_LOG_FILE_NAME"] = "-"
1423 self.process = subprocess.Popen(
1424 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1425 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1426 out, err = self.process.communicate()
1427 self.logger.debug("Finished running `%s'" % executable)
1428 self.logger.info("Return code is `%s'" % self.process.returncode)
1429 self.logger.info(single_line_delim)
1430 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1431 self.logger.info(single_line_delim)
1432 self.logger.info(out)
1433 self.logger.info(single_line_delim)
1434 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1435 self.logger.info(single_line_delim)
1436 self.logger.info(err)
1437 self.logger.info(single_line_delim)
1438 self.result = self.process.returncode
1440 if __name__ == '__main__':