3 from __future__ import print_function
16 from collections import deque
17 from threading import Thread, Event
18 from inspect import getdoc, isclass
19 from traceback import format_exception
20 from logging import FileHandler, DEBUG, Formatter
23 from scapy.packet import Raw
24 from hook import StepHook, PollHook, VppDiedError
25 from vpp_pg_interface import VppPGInterface
26 from vpp_sub_interface import VppSubInterface
27 from vpp_lo_interface import VppLoInterface
28 from vpp_bvi_interface import VppBviInterface
29 from vpp_papi_provider import VppPapiProvider
30 from vpp_papi.vpp_stats import VPPStats
31 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
33 from vpp_object import VppObjectRegistry
34 from util import ppp, is_core_present
35 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
36 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
37 from scapy.layers.inet6 import ICMPv6EchoReply
39 if os.name == 'posix' and sys.version_info[0] < 3:
40 # using subprocess32 is recommended by python official documentation
41 # @ https://docs.python.org/2/library/subprocess.html
42 import subprocess32 as subprocess
46 # Python2/3 compatible
58 debug_framework = False
59 if os.getenv('TEST_DEBUG', "0") == "1":
60 debug_framework = True
64 Test framework module.
66 The module provides a set of tools for constructing and running tests and
67 representing the results.
71 class _PacketInfo(object):
72 """Private class to create packet info object.
74 Help process information about the next packet.
75 Set variables to default values.
77 #: Store the index of the packet.
79 #: Store the index of the source packet generator interface of the packet.
81 #: Store the index of the destination packet generator interface
84 #: Store expected ip version
86 #: Store expected upper protocol
88 #: Store the copy of the former packet.
91 def __eq__(self, other):
92 index = self.index == other.index
93 src = self.src == other.src
94 dst = self.dst == other.dst
95 data = self.data == other.data
96 return index and src and dst and data
99 def pump_output(testclass):
100 """ pump output from vpp stdout/stderr to proper queues """
103 while not testclass.pump_thread_stop_flag.is_set():
104 readable = select.select([testclass.vpp.stdout.fileno(),
105 testclass.vpp.stderr.fileno(),
106 testclass.pump_thread_wakeup_pipe[0]],
108 if testclass.vpp.stdout.fileno() in readable:
109 read = os.read(testclass.vpp.stdout.fileno(), 102400)
111 split = read.splitlines(True)
112 if len(stdout_fragment) > 0:
113 split[0] = "%s%s" % (stdout_fragment, split[0])
114 if len(split) > 0 and split[-1].endswith("\n"):
118 stdout_fragment = split[-1]
119 testclass.vpp_stdout_deque.extend(split[:limit])
120 if not testclass.cache_vpp_output:
121 for line in split[:limit]:
122 testclass.logger.debug(
123 "VPP STDOUT: %s" % line.rstrip("\n"))
124 if testclass.vpp.stderr.fileno() in readable:
125 read = os.read(testclass.vpp.stderr.fileno(), 102400)
127 split = read.splitlines(True)
128 if len(stderr_fragment) > 0:
129 split[0] = "%s%s" % (stderr_fragment, split[0])
130 if len(split) > 0 and split[-1].endswith(b"\n"):
134 stderr_fragment = split[-1]
135 testclass.vpp_stderr_deque.extend(split[:limit])
136 if not testclass.cache_vpp_output:
137 for line in split[:limit]:
138 testclass.logger.debug(
139 "VPP STDERR: %s" % line.rstrip("\n"))
140 # ignoring the dummy pipe here intentionally - the
141 # flag will take care of properly terminating the loop
144 def _is_skip_aarch64_set():
145 return os.getenv('SKIP_AARCH64', 'n').lower() in ('yes', 'y', '1')
147 is_skip_aarch64_set = _is_skip_aarch64_set()
150 def _is_platform_aarch64():
151 return platform.machine() == 'aarch64'
153 is_platform_aarch64 = _is_platform_aarch64()
156 def _running_extended_tests():
157 s = os.getenv("EXTENDED_TESTS", "n")
158 return True if s.lower() in ("y", "yes", "1") else False
160 running_extended_tests = _running_extended_tests()
163 def _running_on_centos():
164 os_id = os.getenv("OS_ID", "")
165 return True if "centos" in os_id.lower() else False
167 running_on_centos = _running_on_centos
170 class KeepAliveReporter(object):
172 Singleton object which reports test start to parent process
177 self.__dict__ = self._shared_state
185 def pipe(self, pipe):
186 if self._pipe is not None:
187 raise Exception("Internal error - pipe should only be set once.")
190 def send_keep_alive(self, test, desc=None):
192 Write current test tmpdir & desc to keep-alive pipe to signal liveness
194 if self.pipe is None:
195 # if not running forked..
199 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
203 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
206 class VppTestCase(unittest.TestCase):
207 """This subclass is a base class for VPP test cases that are implemented as
208 classes. It provides methods to create and run test case.
211 extra_vpp_punt_config = []
212 extra_vpp_plugin_config = []
215 def packet_infos(self):
216 """List of packet infos"""
217 return self._packet_infos
220 def get_packet_count_for_if_idx(cls, dst_if_index):
221 """Get the number of packet info for specified destination if index"""
222 if dst_if_index in cls._packet_count_for_dst_if_idx:
223 return cls._packet_count_for_dst_if_idx[dst_if_index]
229 """Return the instance of this testcase"""
230 return cls.test_instance
233 def set_debug_flags(cls, d):
234 cls.debug_core = False
235 cls.debug_gdb = False
236 cls.debug_gdbserver = False
241 cls.debug_core = True
244 elif dl == "gdbserver":
245 cls.debug_gdbserver = True
247 raise Exception("Unrecognized DEBUG option: '%s'" % d)
250 def get_least_used_cpu():
251 cpu_usage_list = [set(range(psutil.cpu_count()))]
252 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
253 if 'vpp_main' == p.info['name']]
254 for vpp_process in vpp_processes:
255 for cpu_usage_set in cpu_usage_list:
257 cpu_num = vpp_process.cpu_num()
258 if cpu_num in cpu_usage_set:
259 cpu_usage_set_index = cpu_usage_list.index(
261 if cpu_usage_set_index == len(cpu_usage_list) - 1:
262 cpu_usage_list.append({cpu_num})
264 cpu_usage_list[cpu_usage_set_index + 1].add(
266 cpu_usage_set.remove(cpu_num)
268 except psutil.NoSuchProcess:
271 for cpu_usage_set in cpu_usage_list:
272 if len(cpu_usage_set) > 0:
273 min_usage_set = cpu_usage_set
276 return random.choice(tuple(min_usage_set))
279 def setUpConstants(cls):
280 """ Set-up the test case class based on environment variables """
281 s = os.getenv("STEP", "n")
282 cls.step = True if s.lower() in ("y", "yes", "1") else False
283 d = os.getenv("DEBUG", None)
284 c = os.getenv("CACHE_OUTPUT", "1")
285 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
286 cls.set_debug_flags(d)
287 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
288 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
289 cls.test_plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
290 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
292 if cls.plugin_path is not None:
293 if cls.extern_plugin_path is not None:
294 plugin_path = "%s:%s" % (
295 cls.plugin_path, cls.extern_plugin_path)
297 plugin_path = cls.plugin_path
298 elif cls.extern_plugin_path is not None:
299 plugin_path = cls.extern_plugin_path
301 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
302 debug_cli = "cli-listen localhost:5002"
304 size = os.getenv("COREDUMP_SIZE")
306 coredump_size = "coredump-size %s" % size
307 if coredump_size is None:
308 coredump_size = "coredump-size unlimited"
310 cpu_core_number = cls.get_least_used_cpu()
312 cls.vpp_cmdline = [cls.vpp_bin, "unix",
313 "{", "nodaemon", debug_cli, "full-coredump",
314 coredump_size, "runtime-dir", cls.tempdir, "}",
315 "api-trace", "{", "on", "}", "api-segment", "{",
316 "prefix", cls.shm_prefix, "}", "cpu", "{",
317 "main-core", str(cpu_core_number), "}",
318 "statseg", "{", "socket-name", cls.stats_sock, "}",
319 "socksvr", "{", "socket-name", cls.api_sock, "}",
321 "{", "plugin", "dpdk_plugin.so", "{", "disable",
322 "}", "plugin", "rdma_plugin.so", "{", "disable",
323 "}", "plugin", "unittest_plugin.so", "{", "enable",
324 "}"] + cls.extra_vpp_plugin_config + ["}", ]
325 if cls.extra_vpp_punt_config is not None:
326 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
327 if plugin_path is not None:
328 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
329 if cls.test_plugin_path is not None:
330 cls.vpp_cmdline.extend(["test_plugin_path", cls.test_plugin_path])
332 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
333 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
336 def wait_for_enter(cls):
337 if cls.debug_gdbserver:
338 print(double_line_delim)
339 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
341 print(double_line_delim)
342 print("Spawned VPP with PID: %d" % cls.vpp.pid)
344 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
346 print(single_line_delim)
347 print("You can debug the VPP using e.g.:")
348 if cls.debug_gdbserver:
349 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
350 print("Now is the time to attach a gdb by running the above "
351 "command, set up breakpoints etc. and then resume VPP from "
352 "within gdb by issuing the 'continue' command")
354 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
355 print("Now is the time to attach a gdb by running the above "
356 "command and set up breakpoints etc.")
357 print(single_line_delim)
358 input("Press ENTER to continue running the testcase...")
362 cmdline = cls.vpp_cmdline
364 if cls.debug_gdbserver:
365 gdbserver = '/usr/bin/gdbserver'
366 if not os.path.isfile(gdbserver) or \
367 not os.access(gdbserver, os.X_OK):
368 raise Exception("gdbserver binary '%s' does not exist or is "
369 "not executable" % gdbserver)
371 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
372 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
375 cls.vpp = subprocess.Popen(cmdline,
376 stdout=subprocess.PIPE,
377 stderr=subprocess.PIPE,
379 except subprocess.CalledProcessError as e:
380 cls.logger.critical("Subprocess returned with non-0 return code: ("
384 cls.logger.critical("Subprocess returned with OS error: "
385 "(%s) %s", e.errno, e.strerror)
387 except Exception as e:
388 cls.logger.exception("Subprocess returned unexpected from "
395 def wait_for_stats_socket(cls):
396 deadline = time.time() + 3
398 while time.time() < deadline or \
399 cls.debug_gdb or cls.debug_gdbserver:
400 if os.path.exists(cls.stats_sock):
405 cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
408 def wait_for_coredump(cls):
409 corefile = cls.tempdir + "/core"
410 if os.path.isfile(corefile):
411 cls.logger.error("Waiting for coredump to complete: %s", corefile)
412 curr_size = os.path.getsize(corefile)
413 deadline = time.time() + 60
415 while time.time() < deadline:
418 curr_size = os.path.getsize(corefile)
419 if size == curr_size:
423 cls.logger.error("Timed out waiting for coredump to complete:"
426 cls.logger.error("Coredump complete: %s, size %d",
432 Perform class setup before running the testcase
433 Remove shared memory files, start vpp and connect the vpp-api
435 super(VppTestCase, cls).setUpClass()
436 gc.collect() # run garbage collection first
438 cls.logger = get_logger(cls.__name__)
439 if hasattr(cls, 'parallel_handler'):
440 cls.logger.addHandler(cls.parallel_handler)
441 cls.logger.propagate = False
443 cls.tempdir = tempfile.mkdtemp(
444 prefix='vpp-unittest-%s-' % cls.__name__)
445 cls.stats_sock = "%s/stats.sock" % cls.tempdir
446 cls.api_sock = "%s/api.sock" % cls.tempdir
447 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
448 cls.file_handler.setFormatter(
449 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
451 cls.file_handler.setLevel(DEBUG)
452 cls.logger.addHandler(cls.file_handler)
453 cls.logger.debug("--- setUpClass() for %s called ---" %
455 cls.shm_prefix = os.path.basename(cls.tempdir)
456 os.chdir(cls.tempdir)
457 cls.logger.info("Temporary dir is %s, shm prefix is %s",
458 cls.tempdir, cls.shm_prefix)
460 cls.reset_packet_infos()
462 cls._zombie_captures = []
465 cls.registry = VppObjectRegistry()
466 cls.vpp_startup_failed = False
467 cls.reporter = KeepAliveReporter()
468 # need to catch exceptions here because if we raise, then the cleanup
469 # doesn't get called and we might end with a zombie vpp
472 cls.reporter.send_keep_alive(cls, 'setUpClass')
473 VppTestResult.current_test_case_info = TestCaseInfo(
474 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
475 cls.vpp_stdout_deque = deque()
476 cls.vpp_stderr_deque = deque()
477 cls.pump_thread_stop_flag = Event()
478 cls.pump_thread_wakeup_pipe = os.pipe()
479 cls.pump_thread = Thread(target=pump_output, args=(cls,))
480 cls.pump_thread.daemon = True
481 cls.pump_thread.start()
482 if cls.debug_gdb or cls.debug_gdbserver:
486 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
492 cls.vapi.register_hook(hook)
493 cls.wait_for_stats_socket()
494 cls.statistics = VPPStats(socketname=cls.stats_sock)
498 cls.vpp_startup_failed = True
500 "VPP died shortly after startup, check the"
501 " output to standard error for possible cause")
507 cls.vapi.disconnect()
510 if cls.debug_gdbserver:
511 print(colorize("You're running VPP inside gdbserver but "
512 "VPP-API connection failed, did you forget "
513 "to 'continue' VPP from within gdb?", RED))
525 Disconnect vpp-api, kill vpp and cleanup shared memory files
527 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
529 if cls.vpp.returncode is None:
530 print(double_line_delim)
531 print("VPP or GDB server is still running")
532 print(single_line_delim)
533 input("When done debugging, press ENTER to kill the "
534 "process and finish running the testcase...")
536 # first signal that we want to stop the pump thread, then wake it up
537 if hasattr(cls, 'pump_thread_stop_flag'):
538 cls.pump_thread_stop_flag.set()
539 if hasattr(cls, 'pump_thread_wakeup_pipe'):
540 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
541 if hasattr(cls, 'pump_thread'):
542 cls.logger.debug("Waiting for pump thread to stop")
543 cls.pump_thread.join()
544 if hasattr(cls, 'vpp_stderr_reader_thread'):
545 cls.logger.debug("Waiting for stdderr pump to stop")
546 cls.vpp_stderr_reader_thread.join()
548 if hasattr(cls, 'vpp'):
549 if hasattr(cls, 'vapi'):
550 cls.logger.debug("Disconnecting class vapi client on %s",
552 cls.vapi.disconnect()
553 cls.logger.debug("Deleting class vapi attribute on %s",
557 if cls.vpp.returncode is None:
558 cls.wait_for_coredump()
559 cls.logger.debug("Sending TERM to vpp")
561 cls.logger.debug("Waiting for vpp to die")
562 cls.vpp.communicate()
563 cls.logger.debug("Deleting class vpp attribute on %s",
567 if cls.vpp_startup_failed:
568 stdout_log = cls.logger.info
569 stderr_log = cls.logger.critical
571 stdout_log = cls.logger.info
572 stderr_log = cls.logger.info
574 if hasattr(cls, 'vpp_stdout_deque'):
575 stdout_log(single_line_delim)
576 stdout_log('VPP output to stdout while running %s:', cls.__name__)
577 stdout_log(single_line_delim)
578 vpp_output = "".join(cls.vpp_stdout_deque)
579 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
581 stdout_log('\n%s', vpp_output)
582 stdout_log(single_line_delim)
584 if hasattr(cls, 'vpp_stderr_deque'):
585 stderr_log(single_line_delim)
586 stderr_log('VPP output to stderr while running %s:', cls.__name__)
587 stderr_log(single_line_delim)
588 vpp_output = "".join(cls.vpp_stderr_deque)
589 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
591 stderr_log('\n%s', vpp_output)
592 stderr_log(single_line_delim)
595 def tearDownClass(cls):
596 """ Perform final cleanup after running all tests in this test-case """
597 cls.logger.debug("--- tearDownClass() for %s called ---" %
599 cls.reporter.send_keep_alive(cls, 'tearDownClass')
601 cls.file_handler.close()
602 cls.reset_packet_infos()
604 debug_internal.on_tear_down_class(cls)
606 def show_commands_at_teardown(self):
607 """ Allow subclass specific teardown logging additions."""
608 self.logger.info("--- No test specific show commands provided. ---")
611 """ Show various debug prints after each test """
612 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
613 (self.__class__.__name__, self._testMethodName,
614 self._testMethodDoc))
615 if not self.vpp_dead:
617 "--- Logging show commands common to all testcases. ---")
618 self.logger.debug(self.vapi.cli("show trace max 1000"))
619 self.logger.info(self.vapi.ppcli("show interface"))
620 self.logger.info(self.vapi.ppcli("show hardware"))
621 self.logger.info(self.statistics.set_errors_str())
622 self.logger.info(self.vapi.ppcli("show run"))
623 self.logger.info(self.vapi.ppcli("show log"))
624 self.logger.info("Logging testcase specific show commands.")
625 self.show_commands_at_teardown()
626 self.registry.remove_vpp_config(self.logger)
627 # Save/Dump VPP api trace log
628 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
629 tmp_api_trace = "/tmp/%s" % api_trace
630 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
631 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
632 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
634 os.rename(tmp_api_trace, vpp_api_trace_log)
635 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
638 self.registry.unregister_all(self.logger)
641 """ Clear trace before running each test"""
642 super(VppTestCase, self).setUp()
643 self.reporter.send_keep_alive(self)
645 raise Exception("VPP is dead when setting up the test")
646 self.sleep(.1, "during setUp")
647 self.vpp_stdout_deque.append(
648 "--- test setUp() for %s.%s(%s) starts here ---\n" %
649 (self.__class__.__name__, self._testMethodName,
650 self._testMethodDoc))
651 self.vpp_stderr_deque.append(
652 "--- test setUp() for %s.%s(%s) starts here ---\n" %
653 (self.__class__.__name__, self._testMethodName,
654 self._testMethodDoc))
655 self.vapi.cli("clear trace")
656 # store the test instance inside the test class - so that objects
657 # holding the class can access instance methods (like assertEqual)
658 type(self).test_instance = self
661 def pg_enable_capture(cls, interfaces=None):
663 Enable capture on packet-generator interfaces
665 :param interfaces: iterable interface indexes (if None,
666 use self.pg_interfaces)
669 if interfaces is None:
670 interfaces = cls.pg_interfaces
675 def register_capture(cls, cap_name):
676 """ Register a capture in the testclass """
677 # add to the list of captures with current timestamp
678 cls._captures.append((time.time(), cap_name))
679 # filter out from zombies
680 cls._zombie_captures = [(stamp, name)
681 for (stamp, name) in cls._zombie_captures
686 """ Remove any zombie captures and enable the packet generator """
687 # how long before capture is allowed to be deleted - otherwise vpp
688 # crashes - 100ms seems enough (this shouldn't be needed at all)
691 for stamp, cap_name in cls._zombie_captures:
692 wait = stamp + capture_ttl - now
694 cls.sleep(wait, "before deleting capture %s" % cap_name)
696 cls.logger.debug("Removing zombie capture %s" % cap_name)
697 cls.vapi.cli('packet-generator delete %s' % cap_name)
699 cls.vapi.cli("trace add pg-input 1000")
700 cls.vapi.cli('packet-generator enable')
701 cls._zombie_captures = cls._captures
705 def create_pg_interfaces(cls, interfaces):
707 Create packet-generator interfaces.
709 :param interfaces: iterable indexes of the interfaces.
710 :returns: List of created interfaces.
715 intf = VppPGInterface(cls, i)
716 setattr(cls, intf.name, intf)
718 cls.pg_interfaces = result
722 def create_loopback_interfaces(cls, count):
724 Create loopback interfaces.
726 :param count: number of interfaces created.
727 :returns: List of created interfaces.
729 result = [VppLoInterface(cls) for i in range(count)]
731 setattr(cls, intf.name, intf)
732 cls.lo_interfaces = result
736 def create_bvi_interfaces(cls, count):
738 Create BVI interfaces.
740 :param count: number of interfaces created.
741 :returns: List of created interfaces.
743 result = [VppBviInterface(cls) for i in range(count)]
745 setattr(cls, intf.name, intf)
746 cls.bvi_interfaces = result
750 def extend_packet(packet, size, padding=' '):
752 Extend packet to given size by padding with spaces or custom padding
753 NOTE: Currently works only when Raw layer is present.
755 :param packet: packet
756 :param size: target size
757 :param padding: padding used to extend the payload
760 packet_len = len(packet) + 4
761 extend = size - packet_len
763 num = (extend // len(padding)) + 1
764 packet[Raw].load += (padding * num)[:extend].encode("ascii")
767 def reset_packet_infos(cls):
768 """ Reset the list of packet info objects and packet counts to zero """
769 cls._packet_infos = {}
770 cls._packet_count_for_dst_if_idx = {}
773 def create_packet_info(cls, src_if, dst_if):
775 Create packet info object containing the source and destination indexes
776 and add it to the testcase's packet info list
778 :param VppInterface src_if: source interface
779 :param VppInterface dst_if: destination interface
781 :returns: _PacketInfo object
785 info.index = len(cls._packet_infos)
786 info.src = src_if.sw_if_index
787 info.dst = dst_if.sw_if_index
788 if isinstance(dst_if, VppSubInterface):
789 dst_idx = dst_if.parent.sw_if_index
791 dst_idx = dst_if.sw_if_index
792 if dst_idx in cls._packet_count_for_dst_if_idx:
793 cls._packet_count_for_dst_if_idx[dst_idx] += 1
795 cls._packet_count_for_dst_if_idx[dst_idx] = 1
796 cls._packet_infos[info.index] = info
800 def info_to_payload(info):
802 Convert _PacketInfo object to packet payload
804 :param info: _PacketInfo object
806 :returns: string containing serialized data from packet info
808 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
812 def payload_to_info(payload, payload_field='load'):
814 Convert packet payload to _PacketInfo object
816 :param payload: packet payload
817 :type payload: <class 'scapy.packet.Raw'>
818 :param payload_field: packet fieldname of payload "load" for
819 <class 'scapy.packet.Raw'>
820 :type payload_field: str
821 :returns: _PacketInfo object containing de-serialized data from payload
824 numbers = getattr(payload, payload_field).split()
826 info.index = int(numbers[0])
827 info.src = int(numbers[1])
828 info.dst = int(numbers[2])
829 info.ip = int(numbers[3])
830 info.proto = int(numbers[4])
833 def get_next_packet_info(self, info):
835 Iterate over the packet info list stored in the testcase
836 Start iteration with first element if info is None
837 Continue based on index in info if info is specified
839 :param info: info or None
840 :returns: next info in list or None if no more infos
845 next_index = info.index + 1
846 if next_index == len(self._packet_infos):
849 return self._packet_infos[next_index]
851 def get_next_packet_info_for_interface(self, src_index, info):
853 Search the packet info list for the next packet info with same source
856 :param src_index: source interface index to search for
857 :param info: packet info - where to start the search
858 :returns: packet info or None
862 info = self.get_next_packet_info(info)
865 if info.src == src_index:
868 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
870 Search the packet info list for the next packet info with same source
871 and destination interface indexes
873 :param src_index: source interface index to search for
874 :param dst_index: destination interface index to search for
875 :param info: packet info - where to start the search
876 :returns: packet info or None
880 info = self.get_next_packet_info_for_interface(src_index, info)
883 if info.dst == dst_index:
886 def assert_equal(self, real_value, expected_value, name_or_class=None):
887 if name_or_class is None:
888 self.assertEqual(real_value, expected_value)
891 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
892 msg = msg % (getdoc(name_or_class).strip(),
893 real_value, str(name_or_class(real_value)),
894 expected_value, str(name_or_class(expected_value)))
896 msg = "Invalid %s: %s does not match expected value %s" % (
897 name_or_class, real_value, expected_value)
899 self.assertEqual(real_value, expected_value, msg)
901 def assert_in_range(self,
909 msg = "Invalid %s: %s out of range <%s,%s>" % (
910 name, real_value, expected_min, expected_max)
911 self.assertTrue(expected_min <= real_value <= expected_max, msg)
913 def assert_packet_checksums_valid(self, packet,
914 ignore_zero_udp_checksums=True):
915 received = packet.__class__(scapy.compat.raw(packet))
917 ppp("Verifying packet checksums for packet:", received))
918 udp_layers = ['UDP', 'UDPerror']
919 checksum_fields = ['cksum', 'chksum']
922 temp = received.__class__(scapy.compat.raw(received))
924 layer = temp.getlayer(counter)
926 for cf in checksum_fields:
927 if hasattr(layer, cf):
928 if ignore_zero_udp_checksums and \
929 0 == getattr(layer, cf) and \
930 layer.name in udp_layers:
933 checksums.append((counter, cf))
936 counter = counter + 1
937 if 0 == len(checksums):
939 temp = temp.__class__(scapy.compat.raw(temp))
940 for layer, cf in checksums:
941 calc_sum = getattr(temp[layer], cf)
943 getattr(received[layer], cf), calc_sum,
944 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
946 "Checksum field `%s` on `%s` layer has correct value `%s`" %
947 (cf, temp[layer].name, calc_sum))
949 def assert_checksum_valid(self, received_packet, layer,
951 ignore_zero_checksum=False):
952 """ Check checksum of received packet on given layer """
953 received_packet_checksum = getattr(received_packet[layer], field_name)
954 if ignore_zero_checksum and 0 == received_packet_checksum:
956 recalculated = received_packet.__class__(
957 scapy.compat.raw(received_packet))
958 delattr(recalculated[layer], field_name)
959 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
960 self.assert_equal(received_packet_checksum,
961 getattr(recalculated[layer], field_name),
962 "packet checksum on layer: %s" % layer)
964 def assert_ip_checksum_valid(self, received_packet,
965 ignore_zero_checksum=False):
966 self.assert_checksum_valid(received_packet, 'IP',
967 ignore_zero_checksum=ignore_zero_checksum)
969 def assert_tcp_checksum_valid(self, received_packet,
970 ignore_zero_checksum=False):
971 self.assert_checksum_valid(received_packet, 'TCP',
972 ignore_zero_checksum=ignore_zero_checksum)
974 def assert_udp_checksum_valid(self, received_packet,
975 ignore_zero_checksum=True):
976 self.assert_checksum_valid(received_packet, 'UDP',
977 ignore_zero_checksum=ignore_zero_checksum)
979 def assert_embedded_icmp_checksum_valid(self, received_packet):
980 if received_packet.haslayer(IPerror):
981 self.assert_checksum_valid(received_packet, 'IPerror')
982 if received_packet.haslayer(TCPerror):
983 self.assert_checksum_valid(received_packet, 'TCPerror')
984 if received_packet.haslayer(UDPerror):
985 self.assert_checksum_valid(received_packet, 'UDPerror',
986 ignore_zero_checksum=True)
987 if received_packet.haslayer(ICMPerror):
988 self.assert_checksum_valid(received_packet, 'ICMPerror')
990 def assert_icmp_checksum_valid(self, received_packet):
991 self.assert_checksum_valid(received_packet, 'ICMP')
992 self.assert_embedded_icmp_checksum_valid(received_packet)
994 def assert_icmpv6_checksum_valid(self, pkt):
995 if pkt.haslayer(ICMPv6DestUnreach):
996 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
997 self.assert_embedded_icmp_checksum_valid(pkt)
998 if pkt.haslayer(ICMPv6EchoRequest):
999 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
1000 if pkt.haslayer(ICMPv6EchoReply):
1001 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
1003 def get_packet_counter(self, counter):
1004 if counter.startswith("/"):
1005 counter_value = self.statistics.get_counter(counter)
1007 counters = self.vapi.cli("sh errors").split('\n')
1009 for i in range(1, len(counters) - 1):
1010 results = counters[i].split()
1011 if results[1] == counter:
1012 counter_value = int(results[0])
1014 return counter_value
1016 def assert_packet_counter_equal(self, counter, expected_value):
1017 if counter.startswith("/"):
1018 counter_value = self.statistics.get_counter(counter)
1019 self.assert_equal(counter_value, expected_value,
1020 "packet counter `%s'" % counter)
1022 counters = self.vapi.cli("sh errors").split('\n')
1024 for i in range(1, len(counters) - 1):
1025 results = counters[i].split()
1026 if results[1] == counter:
1027 counter_value = int(results[0])
1030 def assert_error_counter_equal(self, counter, expected_value):
1031 counter_value = self.statistics.get_err_counter(counter)
1032 self.assert_equal(counter_value, expected_value,
1033 "error counter `%s'" % counter)
1036 def sleep(cls, timeout, remark=None):
1038 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1039 # * by Guido, only the main thread can be interrupted.
1041 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1044 if hasattr(os, 'sched_yield'):
1050 if hasattr(cls, 'logger'):
1051 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1052 before = time.time()
1055 if hasattr(cls, 'logger') and after - before > 2 * timeout:
1056 cls.logger.error("unexpected self.sleep() result - "
1057 "slept for %es instead of ~%es!",
1058 after - before, timeout)
1059 if hasattr(cls, 'logger'):
1061 "Finished sleep (%s) - slept %es (wanted %es)",
1062 remark, after - before, timeout)
1064 def pg_send(self, intf, pkts):
1065 self.vapi.cli("clear trace")
1066 intf.add_stream(pkts)
1067 self.pg_enable_capture(self.pg_interfaces)
1070 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
1071 self.pg_send(intf, pkts)
1074 for i in self.pg_interfaces:
1075 i.get_capture(0, timeout=timeout)
1076 i.assert_nothing_captured(remark=remark)
1079 def send_and_expect(self, intf, pkts, output, n_rx=None):
1082 self.pg_send(intf, pkts)
1083 rx = output.get_capture(n_rx)
1086 def send_and_expect_only(self, intf, pkts, output, timeout=None):
1087 self.pg_send(intf, pkts)
1088 rx = output.get_capture(len(pkts))
1092 for i in self.pg_interfaces:
1093 if i not in outputs:
1094 i.get_capture(0, timeout=timeout)
1095 i.assert_nothing_captured()
1101 """ unittest calls runTest when TestCase is instantiated without a
1102 test case. Use case: Writing unittests against VppTestCase"""
1106 def get_testcase_doc_name(test):
1107 return getdoc(test.__class__).splitlines()[0]
1110 def get_test_description(descriptions, test):
1111 short_description = test.shortDescription()
1112 if descriptions and short_description:
1113 return short_description
1118 class TestCaseInfo(object):
1119 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1120 self.logger = logger
1121 self.tempdir = tempdir
1122 self.vpp_pid = vpp_pid
1123 self.vpp_bin_path = vpp_bin_path
1124 self.core_crash_test = None
1127 class VppTestResult(unittest.TestResult):
1129 @property result_string
1130 String variable to store the test case result string.
1132 List variable containing 2-tuples of TestCase instances and strings
1133 holding formatted tracebacks. Each tuple represents a test which
1134 raised an unexpected exception.
1136 List variable containing 2-tuples of TestCase instances and strings
1137 holding formatted tracebacks. Each tuple represents a test where
1138 a failure was explicitly signalled using the TestCase.assert*()
1142 failed_test_cases_info = set()
1143 core_crash_test_cases_info = set()
1144 current_test_case_info = None
1146 def __init__(self, stream=None, descriptions=None, verbosity=None,
1149 :param stream File descriptor to store where to report test results.
1150 Set to the standard error stream by default.
1151 :param descriptions Boolean variable to store information if to use
1152 test case descriptions.
1153 :param verbosity Integer variable to store required verbosity level.
1155 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1156 self.stream = stream
1157 self.descriptions = descriptions
1158 self.verbosity = verbosity
1159 self.result_string = None
1160 self.runner = runner
1162 def addSuccess(self, test):
1164 Record a test succeeded result
1169 if self.current_test_case_info:
1170 self.current_test_case_info.logger.debug(
1171 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1172 test._testMethodName,
1173 test._testMethodDoc))
1174 unittest.TestResult.addSuccess(self, test)
1175 self.result_string = colorize("OK", GREEN)
1177 self.send_result_through_pipe(test, PASS)
1179 def addSkip(self, test, reason):
1181 Record a test skipped.
1187 if self.current_test_case_info:
1188 self.current_test_case_info.logger.debug(
1189 "--- addSkip() %s.%s(%s) called, reason is %s" %
1190 (test.__class__.__name__, test._testMethodName,
1191 test._testMethodDoc, reason))
1192 unittest.TestResult.addSkip(self, test, reason)
1193 self.result_string = colorize("SKIP", YELLOW)
1195 self.send_result_through_pipe(test, SKIP)
1197 def symlink_failed(self):
1198 if self.current_test_case_info:
1200 failed_dir = os.getenv('FAILED_DIR')
1201 link_path = os.path.join(
1204 os.path.basename(self.current_test_case_info.tempdir))
1205 if self.current_test_case_info.logger:
1206 self.current_test_case_info.logger.debug(
1207 "creating a link to the failed test")
1208 self.current_test_case_info.logger.debug(
1209 "os.symlink(%s, %s)" %
1210 (self.current_test_case_info.tempdir, link_path))
1211 if os.path.exists(link_path):
1212 if self.current_test_case_info.logger:
1213 self.current_test_case_info.logger.debug(
1214 'symlink already exists')
1216 os.symlink(self.current_test_case_info.tempdir, link_path)
1218 except Exception as e:
1219 if self.current_test_case_info.logger:
1220 self.current_test_case_info.logger.error(e)
1222 def send_result_through_pipe(self, test, result):
1223 if hasattr(self, 'test_framework_result_pipe'):
1224 pipe = self.test_framework_result_pipe
1226 pipe.send((test.id(), result))
1228 def log_error(self, test, err, fn_name):
1229 if self.current_test_case_info:
1230 if isinstance(test, unittest.suite._ErrorHolder):
1231 test_name = test.description
1233 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1234 test._testMethodName,
1235 test._testMethodDoc)
1236 self.current_test_case_info.logger.debug(
1237 "--- %s() %s called, err is %s" %
1238 (fn_name, test_name, err))
1239 self.current_test_case_info.logger.debug(
1240 "formatted exception is:\n%s" %
1241 "".join(format_exception(*err)))
1243 def add_error(self, test, err, unittest_fn, error_type):
1244 if error_type == FAIL:
1245 self.log_error(test, err, 'addFailure')
1246 error_type_str = colorize("FAIL", RED)
1247 elif error_type == ERROR:
1248 self.log_error(test, err, 'addError')
1249 error_type_str = colorize("ERROR", RED)
1251 raise Exception('Error type %s cannot be used to record an '
1252 'error or a failure' % error_type)
1254 unittest_fn(self, test, err)
1255 if self.current_test_case_info:
1256 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1258 self.current_test_case_info.tempdir)
1259 self.symlink_failed()
1260 self.failed_test_cases_info.add(self.current_test_case_info)
1261 if is_core_present(self.current_test_case_info.tempdir):
1262 if not self.current_test_case_info.core_crash_test:
1263 if isinstance(test, unittest.suite._ErrorHolder):
1264 test_name = str(test)
1266 test_name = "'{!s}' ({!s})".format(
1267 get_testcase_doc_name(test), test.id())
1268 self.current_test_case_info.core_crash_test = test_name
1269 self.core_crash_test_cases_info.add(
1270 self.current_test_case_info)
1272 self.result_string = '%s [no temp dir]' % error_type_str
1274 self.send_result_through_pipe(test, error_type)
1276 def addFailure(self, test, err):
1278 Record a test failed result
1281 :param err: error message
1284 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1286 def addError(self, test, err):
1288 Record a test error result
1291 :param err: error message
1294 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1296 def getDescription(self, test):
1298 Get test description
1301 :returns: test description
1304 return get_test_description(self.descriptions, test)
1306 def startTest(self, test):
1314 def print_header(test):
1315 if not hasattr(test.__class__, '_header_printed'):
1316 print(double_line_delim)
1317 print(colorize(getdoc(test).splitlines()[0], GREEN))
1318 print(double_line_delim)
1319 test.__class__._header_printed = True
1323 unittest.TestResult.startTest(self, test)
1324 if self.verbosity > 0:
1325 self.stream.writeln(
1326 "Starting " + self.getDescription(test) + " ...")
1327 self.stream.writeln(single_line_delim)
1329 def stopTest(self, test):
1331 Called when the given test has been run
1336 unittest.TestResult.stopTest(self, test)
1337 if self.verbosity > 0:
1338 self.stream.writeln(single_line_delim)
1339 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1340 self.result_string))
1341 self.stream.writeln(single_line_delim)
1343 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1344 self.result_string))
1346 self.send_result_through_pipe(test, TEST_RUN)
1348 def printErrors(self):
1350 Print errors from running the test case
1352 if len(self.errors) > 0 or len(self.failures) > 0:
1353 self.stream.writeln()
1354 self.printErrorList('ERROR', self.errors)
1355 self.printErrorList('FAIL', self.failures)
1357 # ^^ that is the last output from unittest before summary
1358 if not self.runner.print_summary:
1359 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1360 self.stream = devnull
1361 self.runner.stream = devnull
1363 def printErrorList(self, flavour, errors):
1365 Print error list to the output stream together with error type
1366 and test case description.
1368 :param flavour: error type
1369 :param errors: iterable errors
1372 for test, err in errors:
1373 self.stream.writeln(double_line_delim)
1374 self.stream.writeln("%s: %s" %
1375 (flavour, self.getDescription(test)))
1376 self.stream.writeln(single_line_delim)
1377 self.stream.writeln("%s" % err)
1380 class VppTestRunner(unittest.TextTestRunner):
1382 A basic test runner implementation which prints results to standard error.
1386 def resultclass(self):
1387 """Class maintaining the results of the tests"""
1388 return VppTestResult
1390 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1391 result_pipe=None, failfast=False, buffer=False,
1392 resultclass=None, print_summary=True, **kwargs):
1393 # ignore stream setting here, use hard-coded stdout to be in sync
1394 # with prints from VppTestCase methods ...
1395 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1396 verbosity, failfast, buffer,
1397 resultclass, **kwargs)
1398 KeepAliveReporter.pipe = keep_alive_pipe
1400 self.orig_stream = self.stream
1401 self.resultclass.test_framework_result_pipe = result_pipe
1403 self.print_summary = print_summary
1405 def _makeResult(self):
1406 return self.resultclass(self.stream,
1411 def run(self, test):
1418 faulthandler.enable() # emit stack trace to stderr if killed by signal
1420 result = super(VppTestRunner, self).run(test)
1421 if not self.print_summary:
1422 self.stream = self.orig_stream
1423 result.stream = self.orig_stream
1427 class Worker(Thread):
1428 def __init__(self, args, logger, env={}):
1429 self.logger = logger
1432 self.env = copy.deepcopy(env)
1433 super(Worker, self).__init__()
1436 executable = self.args[0]
1437 self.logger.debug("Running executable w/args `%s'" % self.args)
1438 env = os.environ.copy()
1439 env.update(self.env)
1440 env["CK_LOG_FILE_NAME"] = "-"
1441 self.process = subprocess.Popen(
1442 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1443 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1444 out, err = self.process.communicate()
1445 self.logger.debug("Finished running `%s'" % executable)
1446 self.logger.info("Return code is `%s'" % self.process.returncode)
1447 self.logger.info(single_line_delim)
1448 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1449 self.logger.info(single_line_delim)
1450 self.logger.info(out)
1451 self.logger.info(single_line_delim)
1452 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1453 self.logger.info(single_line_delim)
1454 self.logger.info(err)
1455 self.logger.info(single_line_delim)
1456 self.result = self.process.returncode
1458 if __name__ == '__main__':