3 from __future__ import print_function
16 from collections import deque
17 from threading import Thread, Event
18 from inspect import getdoc, isclass
19 from traceback import format_exception
20 from logging import FileHandler, DEBUG, Formatter
23 from scapy.packet import Raw
24 from hook import StepHook, PollHook, VppDiedError
25 from vpp_pg_interface import VppPGInterface
26 from vpp_sub_interface import VppSubInterface
27 from vpp_lo_interface import VppLoInterface
28 from vpp_bvi_interface import VppBviInterface
29 from vpp_papi_provider import VppPapiProvider
30 from vpp_papi.vpp_stats import VPPStats
31 from vpp_papi.vpp_transport_shmem import VppTransportShmemIOError
32 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
34 from vpp_object import VppObjectRegistry
35 from util import ppp, is_core_present
36 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
37 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
38 from scapy.layers.inet6 import ICMPv6EchoReply
40 if os.name == 'posix' and sys.version_info[0] < 3:
41 # using subprocess32 is recommended by python official documentation
42 # @ https://docs.python.org/2/library/subprocess.html
43 import subprocess32 as subprocess
47 # Python2/3 compatible
59 debug_framework = False
60 if os.getenv('TEST_DEBUG', "0") == "1":
61 debug_framework = True
65 Test framework module.
67 The module provides a set of tools for constructing and running tests and
68 representing the results.
72 class _PacketInfo(object):
73 """Private class to create packet info object.
75 Help process information about the next packet.
76 Set variables to default values.
78 #: Store the index of the packet.
80 #: Store the index of the source packet generator interface of the packet.
82 #: Store the index of the destination packet generator interface
85 #: Store expected ip version
87 #: Store expected upper protocol
89 #: Store the copy of the former packet.
92 def __eq__(self, other):
93 index = self.index == other.index
94 src = self.src == other.src
95 dst = self.dst == other.dst
96 data = self.data == other.data
97 return index and src and dst and data
100 def pump_output(testclass):
101 """ pump output from vpp stdout/stderr to proper queues """
104 while not testclass.pump_thread_stop_flag.is_set():
105 readable = select.select([testclass.vpp.stdout.fileno(),
106 testclass.vpp.stderr.fileno(),
107 testclass.pump_thread_wakeup_pipe[0]],
109 if testclass.vpp.stdout.fileno() in readable:
110 read = os.read(testclass.vpp.stdout.fileno(), 102400)
112 split = read.splitlines(True)
113 if len(stdout_fragment) > 0:
114 split[0] = "%s%s" % (stdout_fragment, split[0])
115 if len(split) > 0 and split[-1].endswith("\n"):
119 stdout_fragment = split[-1]
120 testclass.vpp_stdout_deque.extend(split[:limit])
121 if not testclass.cache_vpp_output:
122 for line in split[:limit]:
123 testclass.logger.debug(
124 "VPP STDOUT: %s" % line.rstrip("\n"))
125 if testclass.vpp.stderr.fileno() in readable:
126 read = os.read(testclass.vpp.stderr.fileno(), 102400)
128 split = read.splitlines(True)
129 if len(stderr_fragment) > 0:
130 split[0] = "%s%s" % (stderr_fragment, split[0])
131 if len(split) > 0 and split[-1].endswith(b"\n"):
135 stderr_fragment = split[-1]
136 testclass.vpp_stderr_deque.extend(split[:limit])
137 if not testclass.cache_vpp_output:
138 for line in split[:limit]:
139 testclass.logger.debug(
140 "VPP STDERR: %s" % line.rstrip("\n"))
141 # ignoring the dummy pipe here intentionally - the
142 # flag will take care of properly terminating the loop
145 def _is_skip_aarch64_set():
146 return os.getenv('SKIP_AARCH64', 'n').lower() in ('yes', 'y', '1')
149 is_skip_aarch64_set = _is_skip_aarch64_set()
152 def _is_platform_aarch64():
153 return platform.machine() == 'aarch64'
156 is_platform_aarch64 = _is_platform_aarch64()
159 def _running_extended_tests():
160 s = os.getenv("EXTENDED_TESTS", "n")
161 return True if s.lower() in ("y", "yes", "1") else False
164 running_extended_tests = _running_extended_tests()
167 def _running_on_centos():
168 os_id = os.getenv("OS_ID", "")
169 return True if "centos" in os_id.lower() else False
172 running_on_centos = _running_on_centos
175 class KeepAliveReporter(object):
177 Singleton object which reports test start to parent process
182 self.__dict__ = self._shared_state
190 def pipe(self, pipe):
191 if self._pipe is not None:
192 raise Exception("Internal error - pipe should only be set once.")
195 def send_keep_alive(self, test, desc=None):
197 Write current test tmpdir & desc to keep-alive pipe to signal liveness
199 if self.pipe is None:
200 # if not running forked..
204 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
208 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
211 class VppTestCase(unittest.TestCase):
212 """This subclass is a base class for VPP test cases that are implemented as
213 classes. It provides methods to create and run test case.
216 extra_vpp_punt_config = []
217 extra_vpp_plugin_config = []
220 def packet_infos(self):
221 """List of packet infos"""
222 return self._packet_infos
225 def get_packet_count_for_if_idx(cls, dst_if_index):
226 """Get the number of packet info for specified destination if index"""
227 if dst_if_index in cls._packet_count_for_dst_if_idx:
228 return cls._packet_count_for_dst_if_idx[dst_if_index]
234 """Return the instance of this testcase"""
235 return cls.test_instance
238 def set_debug_flags(cls, d):
239 cls.debug_core = False
240 cls.debug_gdb = False
241 cls.debug_gdbserver = False
246 cls.debug_core = True
249 elif dl == "gdbserver":
250 cls.debug_gdbserver = True
252 raise Exception("Unrecognized DEBUG option: '%s'" % d)
255 def get_least_used_cpu():
256 cpu_usage_list = [set(range(psutil.cpu_count()))]
257 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
258 if 'vpp_main' == p.info['name']]
259 for vpp_process in vpp_processes:
260 for cpu_usage_set in cpu_usage_list:
262 cpu_num = vpp_process.cpu_num()
263 if cpu_num in cpu_usage_set:
264 cpu_usage_set_index = cpu_usage_list.index(
266 if cpu_usage_set_index == len(cpu_usage_list) - 1:
267 cpu_usage_list.append({cpu_num})
269 cpu_usage_list[cpu_usage_set_index + 1].add(
271 cpu_usage_set.remove(cpu_num)
273 except psutil.NoSuchProcess:
276 for cpu_usage_set in cpu_usage_list:
277 if len(cpu_usage_set) > 0:
278 min_usage_set = cpu_usage_set
281 return random.choice(tuple(min_usage_set))
284 def setUpConstants(cls):
285 """ Set-up the test case class based on environment variables """
286 s = os.getenv("STEP", "n")
287 cls.step = True if s.lower() in ("y", "yes", "1") else False
288 d = os.getenv("DEBUG", None)
289 c = os.getenv("CACHE_OUTPUT", "1")
290 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
291 cls.set_debug_flags(d)
292 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
293 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
294 cls.test_plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
295 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
297 if cls.plugin_path is not None:
298 if cls.extern_plugin_path is not None:
299 plugin_path = "%s:%s" % (
300 cls.plugin_path, cls.extern_plugin_path)
302 plugin_path = cls.plugin_path
303 elif cls.extern_plugin_path is not None:
304 plugin_path = cls.extern_plugin_path
306 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
307 debug_cli = "cli-listen localhost:5002"
309 size = os.getenv("COREDUMP_SIZE")
311 coredump_size = "coredump-size %s" % size
312 if coredump_size is None:
313 coredump_size = "coredump-size unlimited"
315 cpu_core_number = cls.get_least_used_cpu()
317 cls.vpp_cmdline = [cls.vpp_bin, "unix",
318 "{", "nodaemon", debug_cli, "full-coredump",
319 coredump_size, "runtime-dir", cls.tempdir, "}",
320 "api-trace", "{", "on", "}", "api-segment", "{",
321 "prefix", cls.shm_prefix, "}", "cpu", "{",
322 "main-core", str(cpu_core_number), "}",
323 "statseg", "{", "socket-name", cls.stats_sock, "}",
324 "socksvr", "{", "socket-name", cls.api_sock, "}",
326 "{", "plugin", "dpdk_plugin.so", "{", "disable",
327 "}", "plugin", "rdma_plugin.so", "{", "disable",
328 "}", "plugin", "unittest_plugin.so", "{", "enable",
329 "}"] + cls.extra_vpp_plugin_config + ["}", ]
330 if cls.extra_vpp_punt_config is not None:
331 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
332 if plugin_path is not None:
333 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
334 if cls.test_plugin_path is not None:
335 cls.vpp_cmdline.extend(["test_plugin_path", cls.test_plugin_path])
337 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
338 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
341 def wait_for_enter(cls):
342 if cls.debug_gdbserver:
343 print(double_line_delim)
344 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
346 print(double_line_delim)
347 print("Spawned VPP with PID: %d" % cls.vpp.pid)
349 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
351 print(single_line_delim)
352 print("You can debug the VPP using e.g.:")
353 if cls.debug_gdbserver:
354 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
355 print("Now is the time to attach a gdb by running the above "
356 "command, set up breakpoints etc. and then resume VPP from "
357 "within gdb by issuing the 'continue' command")
359 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
360 print("Now is the time to attach a gdb by running the above "
361 "command and set up breakpoints etc.")
362 print(single_line_delim)
363 input("Press ENTER to continue running the testcase...")
367 cmdline = cls.vpp_cmdline
369 if cls.debug_gdbserver:
370 gdbserver = '/usr/bin/gdbserver'
371 if not os.path.isfile(gdbserver) or \
372 not os.access(gdbserver, os.X_OK):
373 raise Exception("gdbserver binary '%s' does not exist or is "
374 "not executable" % gdbserver)
376 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
377 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
380 cls.vpp = subprocess.Popen(cmdline,
381 stdout=subprocess.PIPE,
382 stderr=subprocess.PIPE,
384 except subprocess.CalledProcessError as e:
385 cls.logger.critical("Subprocess returned with non-0 return code: ("
389 cls.logger.critical("Subprocess returned with OS error: "
390 "(%s) %s", e.errno, e.strerror)
392 except Exception as e:
393 cls.logger.exception("Subprocess returned unexpected from "
400 def wait_for_stats_socket(cls):
401 deadline = time.time() + 3
403 while time.time() < deadline or \
404 cls.debug_gdb or cls.debug_gdbserver:
405 if os.path.exists(cls.stats_sock):
410 cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
413 def wait_for_coredump(cls):
414 corefile = cls.tempdir + "/core"
415 if os.path.isfile(corefile):
416 cls.logger.error("Waiting for coredump to complete: %s", corefile)
417 curr_size = os.path.getsize(corefile)
418 deadline = time.time() + 60
420 while time.time() < deadline:
423 curr_size = os.path.getsize(corefile)
424 if size == curr_size:
428 cls.logger.error("Timed out waiting for coredump to complete:"
431 cls.logger.error("Coredump complete: %s, size %d",
437 Perform class setup before running the testcase
438 Remove shared memory files, start vpp and connect the vpp-api
440 super(VppTestCase, cls).setUpClass()
441 gc.collect() # run garbage collection first
443 cls.logger = get_logger(cls.__name__)
444 if hasattr(cls, 'parallel_handler'):
445 cls.logger.addHandler(cls.parallel_handler)
446 cls.logger.propagate = False
448 cls.tempdir = tempfile.mkdtemp(
449 prefix='vpp-unittest-%s-' % cls.__name__)
450 cls.stats_sock = "%s/stats.sock" % cls.tempdir
451 cls.api_sock = "%s/api.sock" % cls.tempdir
452 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
453 cls.file_handler.setFormatter(
454 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
456 cls.file_handler.setLevel(DEBUG)
457 cls.logger.addHandler(cls.file_handler)
458 cls.logger.debug("--- setUpClass() for %s called ---" %
460 cls.shm_prefix = os.path.basename(cls.tempdir)
461 os.chdir(cls.tempdir)
462 cls.logger.info("Temporary dir is %s, shm prefix is %s",
463 cls.tempdir, cls.shm_prefix)
465 cls.reset_packet_infos()
467 cls._zombie_captures = []
470 cls.registry = VppObjectRegistry()
471 cls.vpp_startup_failed = False
472 cls.reporter = KeepAliveReporter()
473 # need to catch exceptions here because if we raise, then the cleanup
474 # doesn't get called and we might end with a zombie vpp
477 cls.reporter.send_keep_alive(cls, 'setUpClass')
478 VppTestResult.current_test_case_info = TestCaseInfo(
479 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
480 cls.vpp_stdout_deque = deque()
481 cls.vpp_stderr_deque = deque()
482 cls.pump_thread_stop_flag = Event()
483 cls.pump_thread_wakeup_pipe = os.pipe()
484 cls.pump_thread = Thread(target=pump_output, args=(cls,))
485 cls.pump_thread.daemon = True
486 cls.pump_thread.start()
487 if cls.debug_gdb or cls.debug_gdbserver:
491 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
497 cls.vapi.register_hook(hook)
498 cls.wait_for_stats_socket()
499 cls.statistics = VPPStats(socketname=cls.stats_sock)
503 cls.vpp_startup_failed = True
505 "VPP died shortly after startup, check the"
506 " output to standard error for possible cause")
512 cls.vapi.disconnect()
515 if cls.debug_gdbserver:
516 print(colorize("You're running VPP inside gdbserver but "
517 "VPP-API connection failed, did you forget "
518 "to 'continue' VPP from within gdb?", RED))
530 Disconnect vpp-api, kill vpp and cleanup shared memory files
532 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
534 if cls.vpp.returncode is None:
535 print(double_line_delim)
536 print("VPP or GDB server is still running")
537 print(single_line_delim)
538 input("When done debugging, press ENTER to kill the "
539 "process and finish running the testcase...")
541 # first signal that we want to stop the pump thread, then wake it up
542 if hasattr(cls, 'pump_thread_stop_flag'):
543 cls.pump_thread_stop_flag.set()
544 if hasattr(cls, 'pump_thread_wakeup_pipe'):
545 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
546 if hasattr(cls, 'pump_thread'):
547 cls.logger.debug("Waiting for pump thread to stop")
548 cls.pump_thread.join()
549 if hasattr(cls, 'vpp_stderr_reader_thread'):
550 cls.logger.debug("Waiting for stdderr pump to stop")
551 cls.vpp_stderr_reader_thread.join()
553 if hasattr(cls, 'vpp'):
554 if hasattr(cls, 'vapi'):
555 cls.logger.debug("Disconnecting class vapi client on %s",
557 cls.vapi.disconnect()
558 cls.logger.debug("Deleting class vapi attribute on %s",
562 if cls.vpp.returncode is None:
563 cls.wait_for_coredump()
564 cls.logger.debug("Sending TERM to vpp")
566 cls.logger.debug("Waiting for vpp to die")
567 cls.vpp.communicate()
568 cls.logger.debug("Deleting class vpp attribute on %s",
572 if cls.vpp_startup_failed:
573 stdout_log = cls.logger.info
574 stderr_log = cls.logger.critical
576 stdout_log = cls.logger.info
577 stderr_log = cls.logger.info
579 if hasattr(cls, 'vpp_stdout_deque'):
580 stdout_log(single_line_delim)
581 stdout_log('VPP output to stdout while running %s:', cls.__name__)
582 stdout_log(single_line_delim)
583 vpp_output = "".join(cls.vpp_stdout_deque)
584 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
586 stdout_log('\n%s', vpp_output)
587 stdout_log(single_line_delim)
589 if hasattr(cls, 'vpp_stderr_deque'):
590 stderr_log(single_line_delim)
591 stderr_log('VPP output to stderr while running %s:', cls.__name__)
592 stderr_log(single_line_delim)
593 vpp_output = "".join(cls.vpp_stderr_deque)
594 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
596 stderr_log('\n%s', vpp_output)
597 stderr_log(single_line_delim)
600 def tearDownClass(cls):
601 """ Perform final cleanup after running all tests in this test-case """
602 cls.logger.debug("--- tearDownClass() for %s called ---" %
604 cls.reporter.send_keep_alive(cls, 'tearDownClass')
606 cls.file_handler.close()
607 cls.reset_packet_infos()
609 debug_internal.on_tear_down_class(cls)
611 def show_commands_at_teardown(self):
612 """ Allow subclass specific teardown logging additions."""
613 self.logger.info("--- No test specific show commands provided. ---")
616 """ Show various debug prints after each test """
617 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
618 (self.__class__.__name__, self._testMethodName,
619 self._testMethodDoc))
622 if not self.vpp_dead:
623 self.logger.debug(self.vapi.cli("show trace max 1000"))
624 self.logger.info(self.vapi.ppcli("show interface"))
625 self.logger.info(self.vapi.ppcli("show hardware"))
626 self.logger.info(self.statistics.set_errors_str())
627 self.logger.info(self.vapi.ppcli("show run"))
628 self.logger.info(self.vapi.ppcli("show log"))
629 self.logger.info("Logging testcase specific show commands.")
630 self.show_commands_at_teardown()
631 self.registry.remove_vpp_config(self.logger)
632 # Save/Dump VPP api trace log
633 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
634 tmp_api_trace = "/tmp/%s" % api_trace
635 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
636 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
637 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
639 os.rename(tmp_api_trace, vpp_api_trace_log)
640 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
642 except VppTransportShmemIOError:
643 self.logger.debug("VppTransportShmemIOError: Vpp dead. "
644 "Cannot log show commands.")
647 self.registry.unregister_all(self.logger)
650 """ Clear trace before running each test"""
651 super(VppTestCase, self).setUp()
652 self.reporter.send_keep_alive(self)
654 raise Exception("VPP is dead when setting up the test")
655 self.sleep(.1, "during setUp")
656 self.vpp_stdout_deque.append(
657 "--- test setUp() for %s.%s(%s) starts here ---\n" %
658 (self.__class__.__name__, self._testMethodName,
659 self._testMethodDoc))
660 self.vpp_stderr_deque.append(
661 "--- test setUp() for %s.%s(%s) starts here ---\n" %
662 (self.__class__.__name__, self._testMethodName,
663 self._testMethodDoc))
664 self.vapi.cli("clear trace")
665 # store the test instance inside the test class - so that objects
666 # holding the class can access instance methods (like assertEqual)
667 type(self).test_instance = self
670 def pg_enable_capture(cls, interfaces=None):
672 Enable capture on packet-generator interfaces
674 :param interfaces: iterable interface indexes (if None,
675 use self.pg_interfaces)
678 if interfaces is None:
679 interfaces = cls.pg_interfaces
684 def register_capture(cls, cap_name):
685 """ Register a capture in the testclass """
686 # add to the list of captures with current timestamp
687 cls._captures.append((time.time(), cap_name))
688 # filter out from zombies
689 cls._zombie_captures = [(stamp, name)
690 for (stamp, name) in cls._zombie_captures
695 """ Remove any zombie captures and enable the packet generator """
696 # how long before capture is allowed to be deleted - otherwise vpp
697 # crashes - 100ms seems enough (this shouldn't be needed at all)
700 for stamp, cap_name in cls._zombie_captures:
701 wait = stamp + capture_ttl - now
703 cls.sleep(wait, "before deleting capture %s" % cap_name)
705 cls.logger.debug("Removing zombie capture %s" % cap_name)
706 cls.vapi.cli('packet-generator delete %s' % cap_name)
708 cls.vapi.cli("trace add pg-input 1000")
709 cls.vapi.cli('packet-generator enable')
710 cls._zombie_captures = cls._captures
714 def create_pg_interfaces(cls, interfaces):
716 Create packet-generator interfaces.
718 :param interfaces: iterable indexes of the interfaces.
719 :returns: List of created interfaces.
724 intf = VppPGInterface(cls, i)
725 setattr(cls, intf.name, intf)
727 cls.pg_interfaces = result
731 def create_loopback_interfaces(cls, count):
733 Create loopback interfaces.
735 :param count: number of interfaces created.
736 :returns: List of created interfaces.
738 result = [VppLoInterface(cls) for i in range(count)]
740 setattr(cls, intf.name, intf)
741 cls.lo_interfaces = result
745 def create_bvi_interfaces(cls, count):
747 Create BVI interfaces.
749 :param count: number of interfaces created.
750 :returns: List of created interfaces.
752 result = [VppBviInterface(cls) for i in range(count)]
754 setattr(cls, intf.name, intf)
755 cls.bvi_interfaces = result
759 def extend_packet(packet, size, padding=' '):
761 Extend packet to given size by padding with spaces or custom padding
762 NOTE: Currently works only when Raw layer is present.
764 :param packet: packet
765 :param size: target size
766 :param padding: padding used to extend the payload
769 packet_len = len(packet) + 4
770 extend = size - packet_len
772 num = (extend // len(padding)) + 1
773 packet[Raw].load += (padding * num)[:extend].encode("ascii")
776 def reset_packet_infos(cls):
777 """ Reset the list of packet info objects and packet counts to zero """
778 cls._packet_infos = {}
779 cls._packet_count_for_dst_if_idx = {}
782 def create_packet_info(cls, src_if, dst_if):
784 Create packet info object containing the source and destination indexes
785 and add it to the testcase's packet info list
787 :param VppInterface src_if: source interface
788 :param VppInterface dst_if: destination interface
790 :returns: _PacketInfo object
794 info.index = len(cls._packet_infos)
795 info.src = src_if.sw_if_index
796 info.dst = dst_if.sw_if_index
797 if isinstance(dst_if, VppSubInterface):
798 dst_idx = dst_if.parent.sw_if_index
800 dst_idx = dst_if.sw_if_index
801 if dst_idx in cls._packet_count_for_dst_if_idx:
802 cls._packet_count_for_dst_if_idx[dst_idx] += 1
804 cls._packet_count_for_dst_if_idx[dst_idx] = 1
805 cls._packet_infos[info.index] = info
809 def info_to_payload(info):
811 Convert _PacketInfo object to packet payload
813 :param info: _PacketInfo object
815 :returns: string containing serialized data from packet info
817 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
821 def payload_to_info(payload, payload_field='load'):
823 Convert packet payload to _PacketInfo object
825 :param payload: packet payload
826 :type payload: <class 'scapy.packet.Raw'>
827 :param payload_field: packet fieldname of payload "load" for
828 <class 'scapy.packet.Raw'>
829 :type payload_field: str
830 :returns: _PacketInfo object containing de-serialized data from payload
833 numbers = getattr(payload, payload_field).split()
835 info.index = int(numbers[0])
836 info.src = int(numbers[1])
837 info.dst = int(numbers[2])
838 info.ip = int(numbers[3])
839 info.proto = int(numbers[4])
842 def get_next_packet_info(self, info):
844 Iterate over the packet info list stored in the testcase
845 Start iteration with first element if info is None
846 Continue based on index in info if info is specified
848 :param info: info or None
849 :returns: next info in list or None if no more infos
854 next_index = info.index + 1
855 if next_index == len(self._packet_infos):
858 return self._packet_infos[next_index]
860 def get_next_packet_info_for_interface(self, src_index, info):
862 Search the packet info list for the next packet info with same source
865 :param src_index: source interface index to search for
866 :param info: packet info - where to start the search
867 :returns: packet info or None
871 info = self.get_next_packet_info(info)
874 if info.src == src_index:
877 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
879 Search the packet info list for the next packet info with same source
880 and destination interface indexes
882 :param src_index: source interface index to search for
883 :param dst_index: destination interface index to search for
884 :param info: packet info - where to start the search
885 :returns: packet info or None
889 info = self.get_next_packet_info_for_interface(src_index, info)
892 if info.dst == dst_index:
895 def assert_equal(self, real_value, expected_value, name_or_class=None):
896 if name_or_class is None:
897 self.assertEqual(real_value, expected_value)
900 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
901 msg = msg % (getdoc(name_or_class).strip(),
902 real_value, str(name_or_class(real_value)),
903 expected_value, str(name_or_class(expected_value)))
905 msg = "Invalid %s: %s does not match expected value %s" % (
906 name_or_class, real_value, expected_value)
908 self.assertEqual(real_value, expected_value, msg)
910 def assert_in_range(self,
918 msg = "Invalid %s: %s out of range <%s,%s>" % (
919 name, real_value, expected_min, expected_max)
920 self.assertTrue(expected_min <= real_value <= expected_max, msg)
922 def assert_packet_checksums_valid(self, packet,
923 ignore_zero_udp_checksums=True):
924 received = packet.__class__(scapy.compat.raw(packet))
926 ppp("Verifying packet checksums for packet:", received))
927 udp_layers = ['UDP', 'UDPerror']
928 checksum_fields = ['cksum', 'chksum']
931 temp = received.__class__(scapy.compat.raw(received))
933 layer = temp.getlayer(counter)
935 for cf in checksum_fields:
936 if hasattr(layer, cf):
937 if ignore_zero_udp_checksums and \
938 0 == getattr(layer, cf) and \
939 layer.name in udp_layers:
942 checksums.append((counter, cf))
945 counter = counter + 1
946 if 0 == len(checksums):
948 temp = temp.__class__(scapy.compat.raw(temp))
949 for layer, cf in checksums:
950 calc_sum = getattr(temp[layer], cf)
952 getattr(received[layer], cf), calc_sum,
953 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
955 "Checksum field `%s` on `%s` layer has correct value `%s`" %
956 (cf, temp[layer].name, calc_sum))
958 def assert_checksum_valid(self, received_packet, layer,
960 ignore_zero_checksum=False):
961 """ Check checksum of received packet on given layer """
962 received_packet_checksum = getattr(received_packet[layer], field_name)
963 if ignore_zero_checksum and 0 == received_packet_checksum:
965 recalculated = received_packet.__class__(
966 scapy.compat.raw(received_packet))
967 delattr(recalculated[layer], field_name)
968 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
969 self.assert_equal(received_packet_checksum,
970 getattr(recalculated[layer], field_name),
971 "packet checksum on layer: %s" % layer)
973 def assert_ip_checksum_valid(self, received_packet,
974 ignore_zero_checksum=False):
975 self.assert_checksum_valid(received_packet, 'IP',
976 ignore_zero_checksum=ignore_zero_checksum)
978 def assert_tcp_checksum_valid(self, received_packet,
979 ignore_zero_checksum=False):
980 self.assert_checksum_valid(received_packet, 'TCP',
981 ignore_zero_checksum=ignore_zero_checksum)
983 def assert_udp_checksum_valid(self, received_packet,
984 ignore_zero_checksum=True):
985 self.assert_checksum_valid(received_packet, 'UDP',
986 ignore_zero_checksum=ignore_zero_checksum)
988 def assert_embedded_icmp_checksum_valid(self, received_packet):
989 if received_packet.haslayer(IPerror):
990 self.assert_checksum_valid(received_packet, 'IPerror')
991 if received_packet.haslayer(TCPerror):
992 self.assert_checksum_valid(received_packet, 'TCPerror')
993 if received_packet.haslayer(UDPerror):
994 self.assert_checksum_valid(received_packet, 'UDPerror',
995 ignore_zero_checksum=True)
996 if received_packet.haslayer(ICMPerror):
997 self.assert_checksum_valid(received_packet, 'ICMPerror')
999 def assert_icmp_checksum_valid(self, received_packet):
1000 self.assert_checksum_valid(received_packet, 'ICMP')
1001 self.assert_embedded_icmp_checksum_valid(received_packet)
1003 def assert_icmpv6_checksum_valid(self, pkt):
1004 if pkt.haslayer(ICMPv6DestUnreach):
1005 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
1006 self.assert_embedded_icmp_checksum_valid(pkt)
1007 if pkt.haslayer(ICMPv6EchoRequest):
1008 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
1009 if pkt.haslayer(ICMPv6EchoReply):
1010 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
1012 def get_packet_counter(self, counter):
1013 if counter.startswith("/"):
1014 counter_value = self.statistics.get_counter(counter)
1016 counters = self.vapi.cli("sh errors").split('\n')
1018 for i in range(1, len(counters) - 1):
1019 results = counters[i].split()
1020 if results[1] == counter:
1021 counter_value = int(results[0])
1023 return counter_value
1025 def assert_packet_counter_equal(self, counter, expected_value):
1026 counter_value = self.get_packet_counter(counter)
1027 self.assert_equal(counter_value, expected_value,
1028 "packet counter `%s'" % counter)
1030 def assert_error_counter_equal(self, counter, expected_value):
1031 counter_value = self.statistics.get_err_counter(counter)
1032 self.assert_equal(counter_value, expected_value,
1033 "error counter `%s'" % counter)
1036 def sleep(cls, timeout, remark=None):
1038 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1039 # * by Guido, only the main thread can be interrupted.
1041 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1044 if hasattr(os, 'sched_yield'):
1050 if hasattr(cls, 'logger'):
1051 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1052 before = time.time()
1055 if hasattr(cls, 'logger') and after - before > 2 * timeout:
1056 cls.logger.error("unexpected self.sleep() result - "
1057 "slept for %es instead of ~%es!",
1058 after - before, timeout)
1059 if hasattr(cls, 'logger'):
1061 "Finished sleep (%s) - slept %es (wanted %es)",
1062 remark, after - before, timeout)
1064 def pg_send(self, intf, pkts):
1065 self.vapi.cli("clear trace")
1066 intf.add_stream(pkts)
1067 self.pg_enable_capture(self.pg_interfaces)
1070 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
1071 self.pg_send(intf, pkts)
1074 for i in self.pg_interfaces:
1075 i.get_capture(0, timeout=timeout)
1076 i.assert_nothing_captured(remark=remark)
1079 def send_and_expect(self, intf, pkts, output, n_rx=None):
1082 self.pg_send(intf, pkts)
1083 rx = output.get_capture(n_rx)
1086 def send_and_expect_only(self, intf, pkts, output, timeout=None):
1087 self.pg_send(intf, pkts)
1088 rx = output.get_capture(len(pkts))
1092 for i in self.pg_interfaces:
1093 if i not in outputs:
1094 i.get_capture(0, timeout=timeout)
1095 i.assert_nothing_captured()
1101 """ unittest calls runTest when TestCase is instantiated without a
1102 test case. Use case: Writing unittests against VppTestCase"""
1106 def get_testcase_doc_name(test):
1107 return getdoc(test.__class__).splitlines()[0]
1110 def get_test_description(descriptions, test):
1111 short_description = test.shortDescription()
1112 if descriptions and short_description:
1113 return short_description
1118 class TestCaseInfo(object):
1119 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1120 self.logger = logger
1121 self.tempdir = tempdir
1122 self.vpp_pid = vpp_pid
1123 self.vpp_bin_path = vpp_bin_path
1124 self.core_crash_test = None
1127 class VppTestResult(unittest.TestResult):
1129 @property result_string
1130 String variable to store the test case result string.
1132 List variable containing 2-tuples of TestCase instances and strings
1133 holding formatted tracebacks. Each tuple represents a test which
1134 raised an unexpected exception.
1136 List variable containing 2-tuples of TestCase instances and strings
1137 holding formatted tracebacks. Each tuple represents a test where
1138 a failure was explicitly signalled using the TestCase.assert*()
1142 failed_test_cases_info = set()
1143 core_crash_test_cases_info = set()
1144 current_test_case_info = None
1146 def __init__(self, stream=None, descriptions=None, verbosity=None,
1149 :param stream File descriptor to store where to report test results.
1150 Set to the standard error stream by default.
1151 :param descriptions Boolean variable to store information if to use
1152 test case descriptions.
1153 :param verbosity Integer variable to store required verbosity level.
1155 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1156 self.stream = stream
1157 self.descriptions = descriptions
1158 self.verbosity = verbosity
1159 self.result_string = None
1160 self.runner = runner
1162 def addSuccess(self, test):
1164 Record a test succeeded result
1169 if self.current_test_case_info:
1170 self.current_test_case_info.logger.debug(
1171 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1172 test._testMethodName,
1173 test._testMethodDoc))
1174 unittest.TestResult.addSuccess(self, test)
1175 self.result_string = colorize("OK", GREEN)
1177 self.send_result_through_pipe(test, PASS)
1179 def addSkip(self, test, reason):
1181 Record a test skipped.
1187 if self.current_test_case_info:
1188 self.current_test_case_info.logger.debug(
1189 "--- addSkip() %s.%s(%s) called, reason is %s" %
1190 (test.__class__.__name__, test._testMethodName,
1191 test._testMethodDoc, reason))
1192 unittest.TestResult.addSkip(self, test, reason)
1193 self.result_string = colorize("SKIP", YELLOW)
1195 self.send_result_through_pipe(test, SKIP)
1197 def symlink_failed(self):
1198 if self.current_test_case_info:
1200 failed_dir = os.getenv('FAILED_DIR')
1201 link_path = os.path.join(
1204 os.path.basename(self.current_test_case_info.tempdir))
1205 if self.current_test_case_info.logger:
1206 self.current_test_case_info.logger.debug(
1207 "creating a link to the failed test")
1208 self.current_test_case_info.logger.debug(
1209 "os.symlink(%s, %s)" %
1210 (self.current_test_case_info.tempdir, link_path))
1211 if os.path.exists(link_path):
1212 if self.current_test_case_info.logger:
1213 self.current_test_case_info.logger.debug(
1214 'symlink already exists')
1216 os.symlink(self.current_test_case_info.tempdir, link_path)
1218 except Exception as e:
1219 if self.current_test_case_info.logger:
1220 self.current_test_case_info.logger.error(e)
1222 def send_result_through_pipe(self, test, result):
1223 if hasattr(self, 'test_framework_result_pipe'):
1224 pipe = self.test_framework_result_pipe
1226 pipe.send((test.id(), result))
1228 def log_error(self, test, err, fn_name):
1229 if self.current_test_case_info:
1230 if isinstance(test, unittest.suite._ErrorHolder):
1231 test_name = test.description
1233 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1234 test._testMethodName,
1235 test._testMethodDoc)
1236 self.current_test_case_info.logger.debug(
1237 "--- %s() %s called, err is %s" %
1238 (fn_name, test_name, err))
1239 self.current_test_case_info.logger.debug(
1240 "formatted exception is:\n%s" %
1241 "".join(format_exception(*err)))
1243 def add_error(self, test, err, unittest_fn, error_type):
1244 if error_type == FAIL:
1245 self.log_error(test, err, 'addFailure')
1246 error_type_str = colorize("FAIL", RED)
1247 elif error_type == ERROR:
1248 self.log_error(test, err, 'addError')
1249 error_type_str = colorize("ERROR", RED)
1251 raise Exception('Error type %s cannot be used to record an '
1252 'error or a failure' % error_type)
1254 unittest_fn(self, test, err)
1255 if self.current_test_case_info:
1256 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1258 self.current_test_case_info.tempdir)
1259 self.symlink_failed()
1260 self.failed_test_cases_info.add(self.current_test_case_info)
1261 if is_core_present(self.current_test_case_info.tempdir):
1262 if not self.current_test_case_info.core_crash_test:
1263 if isinstance(test, unittest.suite._ErrorHolder):
1264 test_name = str(test)
1266 test_name = "'{!s}' ({!s})".format(
1267 get_testcase_doc_name(test), test.id())
1268 self.current_test_case_info.core_crash_test = test_name
1269 self.core_crash_test_cases_info.add(
1270 self.current_test_case_info)
1272 self.result_string = '%s [no temp dir]' % error_type_str
1274 self.send_result_through_pipe(test, error_type)
1276 def addFailure(self, test, err):
1278 Record a test failed result
1281 :param err: error message
1284 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1286 def addError(self, test, err):
1288 Record a test error result
1291 :param err: error message
1294 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1296 def getDescription(self, test):
1298 Get test description
1301 :returns: test description
1304 return get_test_description(self.descriptions, test)
1306 def startTest(self, test):
1314 def print_header(test):
1315 if not hasattr(test.__class__, '_header_printed'):
1316 print(double_line_delim)
1317 print(colorize(getdoc(test).splitlines()[0], GREEN))
1318 print(double_line_delim)
1319 test.__class__._header_printed = True
1323 unittest.TestResult.startTest(self, test)
1324 if self.verbosity > 0:
1325 self.stream.writeln(
1326 "Starting " + self.getDescription(test) + " ...")
1327 self.stream.writeln(single_line_delim)
1329 def stopTest(self, test):
1331 Called when the given test has been run
1336 unittest.TestResult.stopTest(self, test)
1337 if self.verbosity > 0:
1338 self.stream.writeln(single_line_delim)
1339 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1340 self.result_string))
1341 self.stream.writeln(single_line_delim)
1343 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1344 self.result_string))
1346 self.send_result_through_pipe(test, TEST_RUN)
1348 def printErrors(self):
1350 Print errors from running the test case
1352 if len(self.errors) > 0 or len(self.failures) > 0:
1353 self.stream.writeln()
1354 self.printErrorList('ERROR', self.errors)
1355 self.printErrorList('FAIL', self.failures)
1357 # ^^ that is the last output from unittest before summary
1358 if not self.runner.print_summary:
1359 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1360 self.stream = devnull
1361 self.runner.stream = devnull
1363 def printErrorList(self, flavour, errors):
1365 Print error list to the output stream together with error type
1366 and test case description.
1368 :param flavour: error type
1369 :param errors: iterable errors
1372 for test, err in errors:
1373 self.stream.writeln(double_line_delim)
1374 self.stream.writeln("%s: %s" %
1375 (flavour, self.getDescription(test)))
1376 self.stream.writeln(single_line_delim)
1377 self.stream.writeln("%s" % err)
1380 class VppTestRunner(unittest.TextTestRunner):
1382 A basic test runner implementation which prints results to standard error.
1386 def resultclass(self):
1387 """Class maintaining the results of the tests"""
1388 return VppTestResult
1390 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1391 result_pipe=None, failfast=False, buffer=False,
1392 resultclass=None, print_summary=True, **kwargs):
1393 # ignore stream setting here, use hard-coded stdout to be in sync
1394 # with prints from VppTestCase methods ...
1395 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1396 verbosity, failfast, buffer,
1397 resultclass, **kwargs)
1398 KeepAliveReporter.pipe = keep_alive_pipe
1400 self.orig_stream = self.stream
1401 self.resultclass.test_framework_result_pipe = result_pipe
1403 self.print_summary = print_summary
1405 def _makeResult(self):
1406 return self.resultclass(self.stream,
1411 def run(self, test):
1418 faulthandler.enable() # emit stack trace to stderr if killed by signal
1420 result = super(VppTestRunner, self).run(test)
1421 if not self.print_summary:
1422 self.stream = self.orig_stream
1423 result.stream = self.orig_stream
1427 class Worker(Thread):
1428 def __init__(self, args, logger, env={}):
1429 self.logger = logger
1432 self.env = copy.deepcopy(env)
1433 super(Worker, self).__init__()
1436 executable = self.args[0]
1437 self.logger.debug("Running executable w/args `%s'" % self.args)
1438 env = os.environ.copy()
1439 env.update(self.env)
1440 env["CK_LOG_FILE_NAME"] = "-"
1441 self.process = subprocess.Popen(
1442 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1443 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1444 out, err = self.process.communicate()
1445 self.logger.debug("Finished running `%s'" % executable)
1446 self.logger.info("Return code is `%s'" % self.process.returncode)
1447 self.logger.info(single_line_delim)
1448 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1449 self.logger.info(single_line_delim)
1450 self.logger.info(out)
1451 self.logger.info(single_line_delim)
1452 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1453 self.logger.info(single_line_delim)
1454 self.logger.info(err)
1455 self.logger.info(single_line_delim)
1456 self.result = self.process.returncode
1459 if __name__ == '__main__':