3 from __future__ import print_function
16 from collections import deque
17 from threading import Thread, Event
18 from inspect import getdoc, isclass
19 from traceback import format_exception
20 from logging import FileHandler, DEBUG, Formatter
23 from scapy.packet import Raw
24 from hook import StepHook, PollHook, VppDiedError
25 from vpp_pg_interface import VppPGInterface
26 from vpp_sub_interface import VppSubInterface
27 from vpp_lo_interface import VppLoInterface
28 from vpp_bvi_interface import VppBviInterface
29 from vpp_papi_provider import VppPapiProvider
30 from vpp_papi.vpp_stats import VPPStats
31 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
33 from vpp_object import VppObjectRegistry
34 from util import ppp, is_core_present
35 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
36 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
37 from scapy.layers.inet6 import ICMPv6EchoReply
39 if os.name == 'posix' and sys.version_info[0] < 3:
40 # using subprocess32 is recommended by python official documentation
41 # @ https://docs.python.org/2/library/subprocess.html
42 import subprocess32 as subprocess
46 # Python2/3 compatible
58 debug_framework = False
59 if os.getenv('TEST_DEBUG', "0") == "1":
60 debug_framework = True
64 Test framework module.
66 The module provides a set of tools for constructing and running tests and
67 representing the results.
71 class _PacketInfo(object):
72 """Private class to create packet info object.
74 Help process information about the next packet.
75 Set variables to default values.
77 #: Store the index of the packet.
79 #: Store the index of the source packet generator interface of the packet.
81 #: Store the index of the destination packet generator interface
84 #: Store expected ip version
86 #: Store expected upper protocol
88 #: Store the copy of the former packet.
91 def __eq__(self, other):
92 index = self.index == other.index
93 src = self.src == other.src
94 dst = self.dst == other.dst
95 data = self.data == other.data
96 return index and src and dst and data
99 def pump_output(testclass):
100 """ pump output from vpp stdout/stderr to proper queues """
103 while not testclass.pump_thread_stop_flag.is_set():
104 readable = select.select([testclass.vpp.stdout.fileno(),
105 testclass.vpp.stderr.fileno(),
106 testclass.pump_thread_wakeup_pipe[0]],
108 if testclass.vpp.stdout.fileno() in readable:
109 read = os.read(testclass.vpp.stdout.fileno(), 102400)
111 split = read.splitlines(True)
112 if len(stdout_fragment) > 0:
113 split[0] = "%s%s" % (stdout_fragment, split[0])
114 if len(split) > 0 and split[-1].endswith("\n"):
118 stdout_fragment = split[-1]
119 testclass.vpp_stdout_deque.extend(split[:limit])
120 if not testclass.cache_vpp_output:
121 for line in split[:limit]:
122 testclass.logger.debug(
123 "VPP STDOUT: %s" % line.rstrip("\n"))
124 if testclass.vpp.stderr.fileno() in readable:
125 read = os.read(testclass.vpp.stderr.fileno(), 102400)
127 split = read.splitlines(True)
128 if len(stderr_fragment) > 0:
129 split[0] = "%s%s" % (stderr_fragment, split[0])
130 if len(split) > 0 and split[-1].endswith(b"\n"):
134 stderr_fragment = split[-1]
135 testclass.vpp_stderr_deque.extend(split[:limit])
136 if not testclass.cache_vpp_output:
137 for line in split[:limit]:
138 testclass.logger.debug(
139 "VPP STDERR: %s" % line.rstrip("\n"))
140 # ignoring the dummy pipe here intentionally - the
141 # flag will take care of properly terminating the loop
144 def _is_skip_aarch64_set():
145 return os.getenv('SKIP_AARCH64', 'n').lower() in ('yes', 'y', '1')
148 is_skip_aarch64_set = _is_skip_aarch64_set()
151 def _is_platform_aarch64():
152 return platform.machine() == 'aarch64'
155 is_platform_aarch64 = _is_platform_aarch64()
158 def _running_extended_tests():
159 s = os.getenv("EXTENDED_TESTS", "n")
160 return True if s.lower() in ("y", "yes", "1") else False
163 running_extended_tests = _running_extended_tests()
166 def _running_on_centos():
167 os_id = os.getenv("OS_ID", "")
168 return True if "centos" in os_id.lower() else False
171 running_on_centos = _running_on_centos
174 class KeepAliveReporter(object):
176 Singleton object which reports test start to parent process
181 self.__dict__ = self._shared_state
189 def pipe(self, pipe):
190 if self._pipe is not None:
191 raise Exception("Internal error - pipe should only be set once.")
194 def send_keep_alive(self, test, desc=None):
196 Write current test tmpdir & desc to keep-alive pipe to signal liveness
198 if self.pipe is None:
199 # if not running forked..
203 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
207 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
210 class VppTestCase(unittest.TestCase):
211 """This subclass is a base class for VPP test cases that are implemented as
212 classes. It provides methods to create and run test case.
215 extra_vpp_punt_config = []
216 extra_vpp_plugin_config = []
219 def packet_infos(self):
220 """List of packet infos"""
221 return self._packet_infos
224 def get_packet_count_for_if_idx(cls, dst_if_index):
225 """Get the number of packet info for specified destination if index"""
226 if dst_if_index in cls._packet_count_for_dst_if_idx:
227 return cls._packet_count_for_dst_if_idx[dst_if_index]
233 """Return the instance of this testcase"""
234 return cls.test_instance
237 def set_debug_flags(cls, d):
238 cls.debug_core = False
239 cls.debug_gdb = False
240 cls.debug_gdbserver = False
245 cls.debug_core = True
248 elif dl == "gdbserver":
249 cls.debug_gdbserver = True
251 raise Exception("Unrecognized DEBUG option: '%s'" % d)
254 def get_least_used_cpu():
255 cpu_usage_list = [set(range(psutil.cpu_count()))]
256 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
257 if 'vpp_main' == p.info['name']]
258 for vpp_process in vpp_processes:
259 for cpu_usage_set in cpu_usage_list:
261 cpu_num = vpp_process.cpu_num()
262 if cpu_num in cpu_usage_set:
263 cpu_usage_set_index = cpu_usage_list.index(
265 if cpu_usage_set_index == len(cpu_usage_list) - 1:
266 cpu_usage_list.append({cpu_num})
268 cpu_usage_list[cpu_usage_set_index + 1].add(
270 cpu_usage_set.remove(cpu_num)
272 except psutil.NoSuchProcess:
275 for cpu_usage_set in cpu_usage_list:
276 if len(cpu_usage_set) > 0:
277 min_usage_set = cpu_usage_set
280 return random.choice(tuple(min_usage_set))
283 def setUpConstants(cls):
284 """ Set-up the test case class based on environment variables """
285 s = os.getenv("STEP", "n")
286 cls.step = True if s.lower() in ("y", "yes", "1") else False
287 d = os.getenv("DEBUG", None)
288 c = os.getenv("CACHE_OUTPUT", "1")
289 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
290 cls.set_debug_flags(d)
291 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
292 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
293 cls.test_plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
294 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
296 if cls.plugin_path is not None:
297 if cls.extern_plugin_path is not None:
298 plugin_path = "%s:%s" % (
299 cls.plugin_path, cls.extern_plugin_path)
301 plugin_path = cls.plugin_path
302 elif cls.extern_plugin_path is not None:
303 plugin_path = cls.extern_plugin_path
305 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
306 debug_cli = "cli-listen localhost:5002"
308 size = os.getenv("COREDUMP_SIZE")
310 coredump_size = "coredump-size %s" % size
311 if coredump_size is None:
312 coredump_size = "coredump-size unlimited"
314 cpu_core_number = cls.get_least_used_cpu()
316 cls.vpp_cmdline = [cls.vpp_bin, "unix",
317 "{", "nodaemon", debug_cli, "full-coredump",
318 coredump_size, "runtime-dir", cls.tempdir, "}",
319 "api-trace", "{", "on", "}", "api-segment", "{",
320 "prefix", cls.shm_prefix, "}", "cpu", "{",
321 "main-core", str(cpu_core_number), "}",
322 "statseg", "{", "socket-name", cls.stats_sock, "}",
323 "socksvr", "{", "socket-name", cls.api_sock, "}",
325 "{", "plugin", "dpdk_plugin.so", "{", "disable",
326 "}", "plugin", "rdma_plugin.so", "{", "disable",
327 "}", "plugin", "unittest_plugin.so", "{", "enable",
328 "}"] + cls.extra_vpp_plugin_config + ["}", ]
329 if cls.extra_vpp_punt_config is not None:
330 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
331 if plugin_path is not None:
332 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
333 if cls.test_plugin_path is not None:
334 cls.vpp_cmdline.extend(["test_plugin_path", cls.test_plugin_path])
336 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
337 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
340 def wait_for_enter(cls):
341 if cls.debug_gdbserver:
342 print(double_line_delim)
343 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
345 print(double_line_delim)
346 print("Spawned VPP with PID: %d" % cls.vpp.pid)
348 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
350 print(single_line_delim)
351 print("You can debug the VPP using e.g.:")
352 if cls.debug_gdbserver:
353 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
354 print("Now is the time to attach a gdb by running the above "
355 "command, set up breakpoints etc. and then resume VPP from "
356 "within gdb by issuing the 'continue' command")
358 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
359 print("Now is the time to attach a gdb by running the above "
360 "command and set up breakpoints etc.")
361 print(single_line_delim)
362 input("Press ENTER to continue running the testcase...")
366 cmdline = cls.vpp_cmdline
368 if cls.debug_gdbserver:
369 gdbserver = '/usr/bin/gdbserver'
370 if not os.path.isfile(gdbserver) or \
371 not os.access(gdbserver, os.X_OK):
372 raise Exception("gdbserver binary '%s' does not exist or is "
373 "not executable" % gdbserver)
375 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
376 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
379 cls.vpp = subprocess.Popen(cmdline,
380 stdout=subprocess.PIPE,
381 stderr=subprocess.PIPE,
383 except subprocess.CalledProcessError as e:
384 cls.logger.critical("Subprocess returned with non-0 return code: ("
388 cls.logger.critical("Subprocess returned with OS error: "
389 "(%s) %s", e.errno, e.strerror)
391 except Exception as e:
392 cls.logger.exception("Subprocess returned unexpected from "
399 def wait_for_stats_socket(cls):
400 deadline = time.time() + 3
402 while time.time() < deadline or \
403 cls.debug_gdb or cls.debug_gdbserver:
404 if os.path.exists(cls.stats_sock):
409 cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
412 def wait_for_coredump(cls):
413 corefile = cls.tempdir + "/core"
414 if os.path.isfile(corefile):
415 cls.logger.error("Waiting for coredump to complete: %s", corefile)
416 curr_size = os.path.getsize(corefile)
417 deadline = time.time() + 60
419 while time.time() < deadline:
422 curr_size = os.path.getsize(corefile)
423 if size == curr_size:
427 cls.logger.error("Timed out waiting for coredump to complete:"
430 cls.logger.error("Coredump complete: %s, size %d",
436 Perform class setup before running the testcase
437 Remove shared memory files, start vpp and connect the vpp-api
439 super(VppTestCase, cls).setUpClass()
440 gc.collect() # run garbage collection first
442 cls.logger = get_logger(cls.__name__)
443 if hasattr(cls, 'parallel_handler'):
444 cls.logger.addHandler(cls.parallel_handler)
445 cls.logger.propagate = False
447 cls.tempdir = tempfile.mkdtemp(
448 prefix='vpp-unittest-%s-' % cls.__name__)
449 cls.stats_sock = "%s/stats.sock" % cls.tempdir
450 cls.api_sock = "%s/api.sock" % cls.tempdir
451 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
452 cls.file_handler.setFormatter(
453 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
455 cls.file_handler.setLevel(DEBUG)
456 cls.logger.addHandler(cls.file_handler)
457 cls.logger.debug("--- setUpClass() for %s called ---" %
459 cls.shm_prefix = os.path.basename(cls.tempdir)
460 os.chdir(cls.tempdir)
461 cls.logger.info("Temporary dir is %s, shm prefix is %s",
462 cls.tempdir, cls.shm_prefix)
464 cls.reset_packet_infos()
466 cls._zombie_captures = []
469 cls.registry = VppObjectRegistry()
470 cls.vpp_startup_failed = False
471 cls.reporter = KeepAliveReporter()
472 # need to catch exceptions here because if we raise, then the cleanup
473 # doesn't get called and we might end with a zombie vpp
476 cls.reporter.send_keep_alive(cls, 'setUpClass')
477 VppTestResult.current_test_case_info = TestCaseInfo(
478 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
479 cls.vpp_stdout_deque = deque()
480 cls.vpp_stderr_deque = deque()
481 cls.pump_thread_stop_flag = Event()
482 cls.pump_thread_wakeup_pipe = os.pipe()
483 cls.pump_thread = Thread(target=pump_output, args=(cls,))
484 cls.pump_thread.daemon = True
485 cls.pump_thread.start()
486 if cls.debug_gdb or cls.debug_gdbserver:
490 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
496 cls.vapi.register_hook(hook)
497 cls.wait_for_stats_socket()
498 cls.statistics = VPPStats(socketname=cls.stats_sock)
502 cls.vpp_startup_failed = True
504 "VPP died shortly after startup, check the"
505 " output to standard error for possible cause")
511 cls.vapi.disconnect()
514 if cls.debug_gdbserver:
515 print(colorize("You're running VPP inside gdbserver but "
516 "VPP-API connection failed, did you forget "
517 "to 'continue' VPP from within gdb?", RED))
529 Disconnect vpp-api, kill vpp and cleanup shared memory files
531 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
533 if cls.vpp.returncode is None:
534 print(double_line_delim)
535 print("VPP or GDB server is still running")
536 print(single_line_delim)
537 input("When done debugging, press ENTER to kill the "
538 "process and finish running the testcase...")
540 # first signal that we want to stop the pump thread, then wake it up
541 if hasattr(cls, 'pump_thread_stop_flag'):
542 cls.pump_thread_stop_flag.set()
543 if hasattr(cls, 'pump_thread_wakeup_pipe'):
544 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
545 if hasattr(cls, 'pump_thread'):
546 cls.logger.debug("Waiting for pump thread to stop")
547 cls.pump_thread.join()
548 if hasattr(cls, 'vpp_stderr_reader_thread'):
549 cls.logger.debug("Waiting for stdderr pump to stop")
550 cls.vpp_stderr_reader_thread.join()
552 if hasattr(cls, 'vpp'):
553 if hasattr(cls, 'vapi'):
554 cls.logger.debug("Disconnecting class vapi client on %s",
556 cls.vapi.disconnect()
557 cls.logger.debug("Deleting class vapi attribute on %s",
561 if cls.vpp.returncode is None:
562 cls.wait_for_coredump()
563 cls.logger.debug("Sending TERM to vpp")
565 cls.logger.debug("Waiting for vpp to die")
566 cls.vpp.communicate()
567 cls.logger.debug("Deleting class vpp attribute on %s",
571 if cls.vpp_startup_failed:
572 stdout_log = cls.logger.info
573 stderr_log = cls.logger.critical
575 stdout_log = cls.logger.info
576 stderr_log = cls.logger.info
578 if hasattr(cls, 'vpp_stdout_deque'):
579 stdout_log(single_line_delim)
580 stdout_log('VPP output to stdout while running %s:', cls.__name__)
581 stdout_log(single_line_delim)
582 vpp_output = "".join(cls.vpp_stdout_deque)
583 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
585 stdout_log('\n%s', vpp_output)
586 stdout_log(single_line_delim)
588 if hasattr(cls, 'vpp_stderr_deque'):
589 stderr_log(single_line_delim)
590 stderr_log('VPP output to stderr while running %s:', cls.__name__)
591 stderr_log(single_line_delim)
592 vpp_output = "".join(cls.vpp_stderr_deque)
593 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
595 stderr_log('\n%s', vpp_output)
596 stderr_log(single_line_delim)
599 def tearDownClass(cls):
600 """ Perform final cleanup after running all tests in this test-case """
601 cls.logger.debug("--- tearDownClass() for %s called ---" %
603 cls.reporter.send_keep_alive(cls, 'tearDownClass')
605 cls.file_handler.close()
606 cls.reset_packet_infos()
608 debug_internal.on_tear_down_class(cls)
610 def show_commands_at_teardown(self):
611 """ Allow subclass specific teardown logging additions."""
612 self.logger.info("--- No test specific show commands provided. ---")
615 """ Show various debug prints after each test """
616 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
617 (self.__class__.__name__, self._testMethodName,
618 self._testMethodDoc))
619 if not self.vpp_dead:
621 "--- Logging show commands common to all testcases. ---")
622 self.logger.debug(self.vapi.cli("show trace max 1000"))
623 self.logger.info(self.vapi.ppcli("show interface"))
624 self.logger.info(self.vapi.ppcli("show hardware"))
625 self.logger.info(self.statistics.set_errors_str())
626 self.logger.info(self.vapi.ppcli("show run"))
627 self.logger.info(self.vapi.ppcli("show log"))
628 self.logger.info("Logging testcase specific show commands.")
629 self.show_commands_at_teardown()
630 self.registry.remove_vpp_config(self.logger)
631 # Save/Dump VPP api trace log
632 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
633 tmp_api_trace = "/tmp/%s" % api_trace
634 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
635 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
636 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
638 os.rename(tmp_api_trace, vpp_api_trace_log)
639 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
642 self.registry.unregister_all(self.logger)
645 """ Clear trace before running each test"""
646 super(VppTestCase, self).setUp()
647 self.reporter.send_keep_alive(self)
649 raise Exception("VPP is dead when setting up the test")
650 self.sleep(.1, "during setUp")
651 self.vpp_stdout_deque.append(
652 "--- test setUp() for %s.%s(%s) starts here ---\n" %
653 (self.__class__.__name__, self._testMethodName,
654 self._testMethodDoc))
655 self.vpp_stderr_deque.append(
656 "--- test setUp() for %s.%s(%s) starts here ---\n" %
657 (self.__class__.__name__, self._testMethodName,
658 self._testMethodDoc))
659 self.vapi.cli("clear trace")
660 # store the test instance inside the test class - so that objects
661 # holding the class can access instance methods (like assertEqual)
662 type(self).test_instance = self
665 def pg_enable_capture(cls, interfaces=None):
667 Enable capture on packet-generator interfaces
669 :param interfaces: iterable interface indexes (if None,
670 use self.pg_interfaces)
673 if interfaces is None:
674 interfaces = cls.pg_interfaces
679 def register_capture(cls, cap_name):
680 """ Register a capture in the testclass """
681 # add to the list of captures with current timestamp
682 cls._captures.append((time.time(), cap_name))
683 # filter out from zombies
684 cls._zombie_captures = [(stamp, name)
685 for (stamp, name) in cls._zombie_captures
690 """ Remove any zombie captures and enable the packet generator """
691 # how long before capture is allowed to be deleted - otherwise vpp
692 # crashes - 100ms seems enough (this shouldn't be needed at all)
695 for stamp, cap_name in cls._zombie_captures:
696 wait = stamp + capture_ttl - now
698 cls.sleep(wait, "before deleting capture %s" % cap_name)
700 cls.logger.debug("Removing zombie capture %s" % cap_name)
701 cls.vapi.cli('packet-generator delete %s' % cap_name)
703 cls.vapi.cli("trace add pg-input 1000")
704 cls.vapi.cli('packet-generator enable')
705 cls._zombie_captures = cls._captures
709 def create_pg_interfaces(cls, interfaces):
711 Create packet-generator interfaces.
713 :param interfaces: iterable indexes of the interfaces.
714 :returns: List of created interfaces.
719 intf = VppPGInterface(cls, i)
720 setattr(cls, intf.name, intf)
722 cls.pg_interfaces = result
726 def create_loopback_interfaces(cls, count):
728 Create loopback interfaces.
730 :param count: number of interfaces created.
731 :returns: List of created interfaces.
733 result = [VppLoInterface(cls) for i in range(count)]
735 setattr(cls, intf.name, intf)
736 cls.lo_interfaces = result
740 def create_bvi_interfaces(cls, count):
742 Create BVI interfaces.
744 :param count: number of interfaces created.
745 :returns: List of created interfaces.
747 result = [VppBviInterface(cls) for i in range(count)]
749 setattr(cls, intf.name, intf)
750 cls.bvi_interfaces = result
754 def extend_packet(packet, size, padding=' '):
756 Extend packet to given size by padding with spaces or custom padding
757 NOTE: Currently works only when Raw layer is present.
759 :param packet: packet
760 :param size: target size
761 :param padding: padding used to extend the payload
764 packet_len = len(packet) + 4
765 extend = size - packet_len
767 num = (extend // len(padding)) + 1
768 packet[Raw].load += (padding * num)[:extend].encode("ascii")
771 def reset_packet_infos(cls):
772 """ Reset the list of packet info objects and packet counts to zero """
773 cls._packet_infos = {}
774 cls._packet_count_for_dst_if_idx = {}
777 def create_packet_info(cls, src_if, dst_if):
779 Create packet info object containing the source and destination indexes
780 and add it to the testcase's packet info list
782 :param VppInterface src_if: source interface
783 :param VppInterface dst_if: destination interface
785 :returns: _PacketInfo object
789 info.index = len(cls._packet_infos)
790 info.src = src_if.sw_if_index
791 info.dst = dst_if.sw_if_index
792 if isinstance(dst_if, VppSubInterface):
793 dst_idx = dst_if.parent.sw_if_index
795 dst_idx = dst_if.sw_if_index
796 if dst_idx in cls._packet_count_for_dst_if_idx:
797 cls._packet_count_for_dst_if_idx[dst_idx] += 1
799 cls._packet_count_for_dst_if_idx[dst_idx] = 1
800 cls._packet_infos[info.index] = info
804 def info_to_payload(info):
806 Convert _PacketInfo object to packet payload
808 :param info: _PacketInfo object
810 :returns: string containing serialized data from packet info
812 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
816 def payload_to_info(payload, payload_field='load'):
818 Convert packet payload to _PacketInfo object
820 :param payload: packet payload
821 :type payload: <class 'scapy.packet.Raw'>
822 :param payload_field: packet fieldname of payload "load" for
823 <class 'scapy.packet.Raw'>
824 :type payload_field: str
825 :returns: _PacketInfo object containing de-serialized data from payload
828 numbers = getattr(payload, payload_field).split()
830 info.index = int(numbers[0])
831 info.src = int(numbers[1])
832 info.dst = int(numbers[2])
833 info.ip = int(numbers[3])
834 info.proto = int(numbers[4])
837 def get_next_packet_info(self, info):
839 Iterate over the packet info list stored in the testcase
840 Start iteration with first element if info is None
841 Continue based on index in info if info is specified
843 :param info: info or None
844 :returns: next info in list or None if no more infos
849 next_index = info.index + 1
850 if next_index == len(self._packet_infos):
853 return self._packet_infos[next_index]
855 def get_next_packet_info_for_interface(self, src_index, info):
857 Search the packet info list for the next packet info with same source
860 :param src_index: source interface index to search for
861 :param info: packet info - where to start the search
862 :returns: packet info or None
866 info = self.get_next_packet_info(info)
869 if info.src == src_index:
872 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
874 Search the packet info list for the next packet info with same source
875 and destination interface indexes
877 :param src_index: source interface index to search for
878 :param dst_index: destination interface index to search for
879 :param info: packet info - where to start the search
880 :returns: packet info or None
884 info = self.get_next_packet_info_for_interface(src_index, info)
887 if info.dst == dst_index:
890 def assert_equal(self, real_value, expected_value, name_or_class=None):
891 if name_or_class is None:
892 self.assertEqual(real_value, expected_value)
895 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
896 msg = msg % (getdoc(name_or_class).strip(),
897 real_value, str(name_or_class(real_value)),
898 expected_value, str(name_or_class(expected_value)))
900 msg = "Invalid %s: %s does not match expected value %s" % (
901 name_or_class, real_value, expected_value)
903 self.assertEqual(real_value, expected_value, msg)
905 def assert_in_range(self,
913 msg = "Invalid %s: %s out of range <%s,%s>" % (
914 name, real_value, expected_min, expected_max)
915 self.assertTrue(expected_min <= real_value <= expected_max, msg)
917 def assert_packet_checksums_valid(self, packet,
918 ignore_zero_udp_checksums=True):
919 received = packet.__class__(scapy.compat.raw(packet))
921 ppp("Verifying packet checksums for packet:", received))
922 udp_layers = ['UDP', 'UDPerror']
923 checksum_fields = ['cksum', 'chksum']
926 temp = received.__class__(scapy.compat.raw(received))
928 layer = temp.getlayer(counter)
930 for cf in checksum_fields:
931 if hasattr(layer, cf):
932 if ignore_zero_udp_checksums and \
933 0 == getattr(layer, cf) and \
934 layer.name in udp_layers:
937 checksums.append((counter, cf))
940 counter = counter + 1
941 if 0 == len(checksums):
943 temp = temp.__class__(scapy.compat.raw(temp))
944 for layer, cf in checksums:
945 calc_sum = getattr(temp[layer], cf)
947 getattr(received[layer], cf), calc_sum,
948 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
950 "Checksum field `%s` on `%s` layer has correct value `%s`" %
951 (cf, temp[layer].name, calc_sum))
953 def assert_checksum_valid(self, received_packet, layer,
955 ignore_zero_checksum=False):
956 """ Check checksum of received packet on given layer """
957 received_packet_checksum = getattr(received_packet[layer], field_name)
958 if ignore_zero_checksum and 0 == received_packet_checksum:
960 recalculated = received_packet.__class__(
961 scapy.compat.raw(received_packet))
962 delattr(recalculated[layer], field_name)
963 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
964 self.assert_equal(received_packet_checksum,
965 getattr(recalculated[layer], field_name),
966 "packet checksum on layer: %s" % layer)
968 def assert_ip_checksum_valid(self, received_packet,
969 ignore_zero_checksum=False):
970 self.assert_checksum_valid(received_packet, 'IP',
971 ignore_zero_checksum=ignore_zero_checksum)
973 def assert_tcp_checksum_valid(self, received_packet,
974 ignore_zero_checksum=False):
975 self.assert_checksum_valid(received_packet, 'TCP',
976 ignore_zero_checksum=ignore_zero_checksum)
978 def assert_udp_checksum_valid(self, received_packet,
979 ignore_zero_checksum=True):
980 self.assert_checksum_valid(received_packet, 'UDP',
981 ignore_zero_checksum=ignore_zero_checksum)
983 def assert_embedded_icmp_checksum_valid(self, received_packet):
984 if received_packet.haslayer(IPerror):
985 self.assert_checksum_valid(received_packet, 'IPerror')
986 if received_packet.haslayer(TCPerror):
987 self.assert_checksum_valid(received_packet, 'TCPerror')
988 if received_packet.haslayer(UDPerror):
989 self.assert_checksum_valid(received_packet, 'UDPerror',
990 ignore_zero_checksum=True)
991 if received_packet.haslayer(ICMPerror):
992 self.assert_checksum_valid(received_packet, 'ICMPerror')
994 def assert_icmp_checksum_valid(self, received_packet):
995 self.assert_checksum_valid(received_packet, 'ICMP')
996 self.assert_embedded_icmp_checksum_valid(received_packet)
998 def assert_icmpv6_checksum_valid(self, pkt):
999 if pkt.haslayer(ICMPv6DestUnreach):
1000 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
1001 self.assert_embedded_icmp_checksum_valid(pkt)
1002 if pkt.haslayer(ICMPv6EchoRequest):
1003 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
1004 if pkt.haslayer(ICMPv6EchoReply):
1005 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
1007 def get_packet_counter(self, counter):
1008 if counter.startswith("/"):
1009 counter_value = self.statistics.get_counter(counter)
1011 counters = self.vapi.cli("sh errors").split('\n')
1013 for i in range(1, len(counters) - 1):
1014 results = counters[i].split()
1015 if results[1] == counter:
1016 counter_value = int(results[0])
1018 return counter_value
1020 def assert_packet_counter_equal(self, counter, expected_value):
1021 counter_value = self.get_packet_counter(counter)
1022 self.assert_equal(counter_value, expected_value,
1023 "packet counter `%s'" % counter)
1025 def assert_error_counter_equal(self, counter, expected_value):
1026 counter_value = self.statistics.get_err_counter(counter)
1027 self.assert_equal(counter_value, expected_value,
1028 "error counter `%s'" % counter)
1031 def sleep(cls, timeout, remark=None):
1033 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1034 # * by Guido, only the main thread can be interrupted.
1036 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1039 if hasattr(os, 'sched_yield'):
1045 if hasattr(cls, 'logger'):
1046 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1047 before = time.time()
1050 if hasattr(cls, 'logger') and after - before > 2 * timeout:
1051 cls.logger.error("unexpected self.sleep() result - "
1052 "slept for %es instead of ~%es!",
1053 after - before, timeout)
1054 if hasattr(cls, 'logger'):
1056 "Finished sleep (%s) - slept %es (wanted %es)",
1057 remark, after - before, timeout)
1059 def pg_send(self, intf, pkts):
1060 self.vapi.cli("clear trace")
1061 intf.add_stream(pkts)
1062 self.pg_enable_capture(self.pg_interfaces)
1065 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
1066 self.pg_send(intf, pkts)
1069 for i in self.pg_interfaces:
1070 i.get_capture(0, timeout=timeout)
1071 i.assert_nothing_captured(remark=remark)
1074 def send_and_expect(self, intf, pkts, output, n_rx=None):
1077 self.pg_send(intf, pkts)
1078 rx = output.get_capture(n_rx)
1081 def send_and_expect_only(self, intf, pkts, output, timeout=None):
1082 self.pg_send(intf, pkts)
1083 rx = output.get_capture(len(pkts))
1087 for i in self.pg_interfaces:
1088 if i not in outputs:
1089 i.get_capture(0, timeout=timeout)
1090 i.assert_nothing_captured()
1096 """ unittest calls runTest when TestCase is instantiated without a
1097 test case. Use case: Writing unittests against VppTestCase"""
1101 def get_testcase_doc_name(test):
1102 return getdoc(test.__class__).splitlines()[0]
1105 def get_test_description(descriptions, test):
1106 short_description = test.shortDescription()
1107 if descriptions and short_description:
1108 return short_description
1113 class TestCaseInfo(object):
1114 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1115 self.logger = logger
1116 self.tempdir = tempdir
1117 self.vpp_pid = vpp_pid
1118 self.vpp_bin_path = vpp_bin_path
1119 self.core_crash_test = None
1122 class VppTestResult(unittest.TestResult):
1124 @property result_string
1125 String variable to store the test case result string.
1127 List variable containing 2-tuples of TestCase instances and strings
1128 holding formatted tracebacks. Each tuple represents a test which
1129 raised an unexpected exception.
1131 List variable containing 2-tuples of TestCase instances and strings
1132 holding formatted tracebacks. Each tuple represents a test where
1133 a failure was explicitly signalled using the TestCase.assert*()
1137 failed_test_cases_info = set()
1138 core_crash_test_cases_info = set()
1139 current_test_case_info = None
1141 def __init__(self, stream=None, descriptions=None, verbosity=None,
1144 :param stream File descriptor to store where to report test results.
1145 Set to the standard error stream by default.
1146 :param descriptions Boolean variable to store information if to use
1147 test case descriptions.
1148 :param verbosity Integer variable to store required verbosity level.
1150 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1151 self.stream = stream
1152 self.descriptions = descriptions
1153 self.verbosity = verbosity
1154 self.result_string = None
1155 self.runner = runner
1157 def addSuccess(self, test):
1159 Record a test succeeded result
1164 if self.current_test_case_info:
1165 self.current_test_case_info.logger.debug(
1166 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1167 test._testMethodName,
1168 test._testMethodDoc))
1169 unittest.TestResult.addSuccess(self, test)
1170 self.result_string = colorize("OK", GREEN)
1172 self.send_result_through_pipe(test, PASS)
1174 def addSkip(self, test, reason):
1176 Record a test skipped.
1182 if self.current_test_case_info:
1183 self.current_test_case_info.logger.debug(
1184 "--- addSkip() %s.%s(%s) called, reason is %s" %
1185 (test.__class__.__name__, test._testMethodName,
1186 test._testMethodDoc, reason))
1187 unittest.TestResult.addSkip(self, test, reason)
1188 self.result_string = colorize("SKIP", YELLOW)
1190 self.send_result_through_pipe(test, SKIP)
1192 def symlink_failed(self):
1193 if self.current_test_case_info:
1195 failed_dir = os.getenv('FAILED_DIR')
1196 link_path = os.path.join(
1199 os.path.basename(self.current_test_case_info.tempdir))
1200 if self.current_test_case_info.logger:
1201 self.current_test_case_info.logger.debug(
1202 "creating a link to the failed test")
1203 self.current_test_case_info.logger.debug(
1204 "os.symlink(%s, %s)" %
1205 (self.current_test_case_info.tempdir, link_path))
1206 if os.path.exists(link_path):
1207 if self.current_test_case_info.logger:
1208 self.current_test_case_info.logger.debug(
1209 'symlink already exists')
1211 os.symlink(self.current_test_case_info.tempdir, link_path)
1213 except Exception as e:
1214 if self.current_test_case_info.logger:
1215 self.current_test_case_info.logger.error(e)
1217 def send_result_through_pipe(self, test, result):
1218 if hasattr(self, 'test_framework_result_pipe'):
1219 pipe = self.test_framework_result_pipe
1221 pipe.send((test.id(), result))
1223 def log_error(self, test, err, fn_name):
1224 if self.current_test_case_info:
1225 if isinstance(test, unittest.suite._ErrorHolder):
1226 test_name = test.description
1228 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1229 test._testMethodName,
1230 test._testMethodDoc)
1231 self.current_test_case_info.logger.debug(
1232 "--- %s() %s called, err is %s" %
1233 (fn_name, test_name, err))
1234 self.current_test_case_info.logger.debug(
1235 "formatted exception is:\n%s" %
1236 "".join(format_exception(*err)))
1238 def add_error(self, test, err, unittest_fn, error_type):
1239 if error_type == FAIL:
1240 self.log_error(test, err, 'addFailure')
1241 error_type_str = colorize("FAIL", RED)
1242 elif error_type == ERROR:
1243 self.log_error(test, err, 'addError')
1244 error_type_str = colorize("ERROR", RED)
1246 raise Exception('Error type %s cannot be used to record an '
1247 'error or a failure' % error_type)
1249 unittest_fn(self, test, err)
1250 if self.current_test_case_info:
1251 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1253 self.current_test_case_info.tempdir)
1254 self.symlink_failed()
1255 self.failed_test_cases_info.add(self.current_test_case_info)
1256 if is_core_present(self.current_test_case_info.tempdir):
1257 if not self.current_test_case_info.core_crash_test:
1258 if isinstance(test, unittest.suite._ErrorHolder):
1259 test_name = str(test)
1261 test_name = "'{!s}' ({!s})".format(
1262 get_testcase_doc_name(test), test.id())
1263 self.current_test_case_info.core_crash_test = test_name
1264 self.core_crash_test_cases_info.add(
1265 self.current_test_case_info)
1267 self.result_string = '%s [no temp dir]' % error_type_str
1269 self.send_result_through_pipe(test, error_type)
1271 def addFailure(self, test, err):
1273 Record a test failed result
1276 :param err: error message
1279 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1281 def addError(self, test, err):
1283 Record a test error result
1286 :param err: error message
1289 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1291 def getDescription(self, test):
1293 Get test description
1296 :returns: test description
1299 return get_test_description(self.descriptions, test)
1301 def startTest(self, test):
1309 def print_header(test):
1310 if not hasattr(test.__class__, '_header_printed'):
1311 print(double_line_delim)
1312 print(colorize(getdoc(test).splitlines()[0], GREEN))
1313 print(double_line_delim)
1314 test.__class__._header_printed = True
1318 unittest.TestResult.startTest(self, test)
1319 if self.verbosity > 0:
1320 self.stream.writeln(
1321 "Starting " + self.getDescription(test) + " ...")
1322 self.stream.writeln(single_line_delim)
1324 def stopTest(self, test):
1326 Called when the given test has been run
1331 unittest.TestResult.stopTest(self, test)
1332 if self.verbosity > 0:
1333 self.stream.writeln(single_line_delim)
1334 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1335 self.result_string))
1336 self.stream.writeln(single_line_delim)
1338 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1339 self.result_string))
1341 self.send_result_through_pipe(test, TEST_RUN)
1343 def printErrors(self):
1345 Print errors from running the test case
1347 if len(self.errors) > 0 or len(self.failures) > 0:
1348 self.stream.writeln()
1349 self.printErrorList('ERROR', self.errors)
1350 self.printErrorList('FAIL', self.failures)
1352 # ^^ that is the last output from unittest before summary
1353 if not self.runner.print_summary:
1354 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1355 self.stream = devnull
1356 self.runner.stream = devnull
1358 def printErrorList(self, flavour, errors):
1360 Print error list to the output stream together with error type
1361 and test case description.
1363 :param flavour: error type
1364 :param errors: iterable errors
1367 for test, err in errors:
1368 self.stream.writeln(double_line_delim)
1369 self.stream.writeln("%s: %s" %
1370 (flavour, self.getDescription(test)))
1371 self.stream.writeln(single_line_delim)
1372 self.stream.writeln("%s" % err)
1375 class VppTestRunner(unittest.TextTestRunner):
1377 A basic test runner implementation which prints results to standard error.
1381 def resultclass(self):
1382 """Class maintaining the results of the tests"""
1383 return VppTestResult
1385 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1386 result_pipe=None, failfast=False, buffer=False,
1387 resultclass=None, print_summary=True, **kwargs):
1388 # ignore stream setting here, use hard-coded stdout to be in sync
1389 # with prints from VppTestCase methods ...
1390 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1391 verbosity, failfast, buffer,
1392 resultclass, **kwargs)
1393 KeepAliveReporter.pipe = keep_alive_pipe
1395 self.orig_stream = self.stream
1396 self.resultclass.test_framework_result_pipe = result_pipe
1398 self.print_summary = print_summary
1400 def _makeResult(self):
1401 return self.resultclass(self.stream,
1406 def run(self, test):
1413 faulthandler.enable() # emit stack trace to stderr if killed by signal
1415 result = super(VppTestRunner, self).run(test)
1416 if not self.print_summary:
1417 self.stream = self.orig_stream
1418 result.stream = self.orig_stream
1422 class Worker(Thread):
1423 def __init__(self, args, logger, env={}):
1424 self.logger = logger
1427 self.env = copy.deepcopy(env)
1428 super(Worker, self).__init__()
1431 executable = self.args[0]
1432 self.logger.debug("Running executable w/args `%s'" % self.args)
1433 env = os.environ.copy()
1434 env.update(self.env)
1435 env["CK_LOG_FILE_NAME"] = "-"
1436 self.process = subprocess.Popen(
1437 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1438 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1439 out, err = self.process.communicate()
1440 self.logger.debug("Finished running `%s'" % executable)
1441 self.logger.info("Return code is `%s'" % self.process.returncode)
1442 self.logger.info(single_line_delim)
1443 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1444 self.logger.info(single_line_delim)
1445 self.logger.info(out)
1446 self.logger.info(single_line_delim)
1447 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1448 self.logger.info(single_line_delim)
1449 self.logger.info(err)
1450 self.logger.info(single_line_delim)
1451 self.result = self.process.returncode
1454 if __name__ == '__main__':