3 from __future__ import print_function
16 from collections import deque
17 from threading import Thread, Event
18 from inspect import getdoc, isclass
19 from traceback import format_exception
20 from logging import FileHandler, DEBUG, Formatter
23 from scapy.packet import Raw
24 from hook import StepHook, PollHook, VppDiedError
25 from vpp_pg_interface import VppPGInterface
26 from vpp_sub_interface import VppSubInterface
27 from vpp_lo_interface import VppLoInterface
28 from vpp_bvi_interface import VppBviInterface
29 from vpp_papi_provider import VppPapiProvider
30 from vpp_papi.vpp_stats import VPPStats
31 from vpp_papi.vpp_transport_shmem import VppTransportShmemIOError
32 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
34 from vpp_object import VppObjectRegistry
35 from util import ppp, is_core_present
36 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
37 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
38 from scapy.layers.inet6 import ICMPv6EchoReply
40 if os.name == 'posix' and sys.version_info[0] < 3:
41 # using subprocess32 is recommended by python official documentation
42 # @ https://docs.python.org/2/library/subprocess.html
43 import subprocess32 as subprocess
47 # Python2/3 compatible
59 debug_framework = False
60 if os.getenv('TEST_DEBUG', "0") == "1":
61 debug_framework = True
65 Test framework module.
67 The module provides a set of tools for constructing and running tests and
68 representing the results.
72 class _PacketInfo(object):
73 """Private class to create packet info object.
75 Help process information about the next packet.
76 Set variables to default values.
78 #: Store the index of the packet.
80 #: Store the index of the source packet generator interface of the packet.
82 #: Store the index of the destination packet generator interface
85 #: Store expected ip version
87 #: Store expected upper protocol
89 #: Store the copy of the former packet.
92 def __eq__(self, other):
93 index = self.index == other.index
94 src = self.src == other.src
95 dst = self.dst == other.dst
96 data = self.data == other.data
97 return index and src and dst and data
100 def pump_output(testclass):
101 """ pump output from vpp stdout/stderr to proper queues """
104 while not testclass.pump_thread_stop_flag.is_set():
105 readable = select.select([testclass.vpp.stdout.fileno(),
106 testclass.vpp.stderr.fileno(),
107 testclass.pump_thread_wakeup_pipe[0]],
109 if testclass.vpp.stdout.fileno() in readable:
110 read = os.read(testclass.vpp.stdout.fileno(), 102400)
112 split = read.splitlines(True)
113 if len(stdout_fragment) > 0:
114 split[0] = "%s%s" % (stdout_fragment, split[0])
115 if len(split) > 0 and split[-1].endswith("\n"):
119 stdout_fragment = split[-1]
120 testclass.vpp_stdout_deque.extend(split[:limit])
121 if not testclass.cache_vpp_output:
122 for line in split[:limit]:
123 testclass.logger.debug(
124 "VPP STDOUT: %s" % line.rstrip("\n"))
125 if testclass.vpp.stderr.fileno() in readable:
126 read = os.read(testclass.vpp.stderr.fileno(), 102400)
128 split = read.splitlines(True)
129 if len(stderr_fragment) > 0:
130 split[0] = "%s%s" % (stderr_fragment, split[0])
131 if len(split) > 0 and split[-1].endswith(b"\n"):
135 stderr_fragment = split[-1]
136 testclass.vpp_stderr_deque.extend(split[:limit])
137 if not testclass.cache_vpp_output:
138 for line in split[:limit]:
139 testclass.logger.debug(
140 "VPP STDERR: %s" % line.rstrip("\n"))
141 # ignoring the dummy pipe here intentionally - the
142 # flag will take care of properly terminating the loop
145 def _is_skip_aarch64_set():
146 return os.getenv('SKIP_AARCH64', 'n').lower() in ('yes', 'y', '1')
149 is_skip_aarch64_set = _is_skip_aarch64_set()
152 def _is_platform_aarch64():
153 return platform.machine() == 'aarch64'
156 is_platform_aarch64 = _is_platform_aarch64()
159 def _running_extended_tests():
160 s = os.getenv("EXTENDED_TESTS", "n")
161 return True if s.lower() in ("y", "yes", "1") else False
164 running_extended_tests = _running_extended_tests()
167 def _running_on_centos():
168 os_id = os.getenv("OS_ID", "")
169 return True if "centos" in os_id.lower() else False
172 running_on_centos = _running_on_centos
175 class KeepAliveReporter(object):
177 Singleton object which reports test start to parent process
182 self.__dict__ = self._shared_state
190 def pipe(self, pipe):
191 if self._pipe is not None:
192 raise Exception("Internal error - pipe should only be set once.")
195 def send_keep_alive(self, test, desc=None):
197 Write current test tmpdir & desc to keep-alive pipe to signal liveness
199 if self.pipe is None:
200 # if not running forked..
204 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
208 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
211 class VppTestCase(unittest.TestCase):
212 """This subclass is a base class for VPP test cases that are implemented as
213 classes. It provides methods to create and run test case.
216 extra_vpp_punt_config = []
217 extra_vpp_plugin_config = []
220 def packet_infos(self):
221 """List of packet infos"""
222 return self._packet_infos
225 def get_packet_count_for_if_idx(cls, dst_if_index):
226 """Get the number of packet info for specified destination if index"""
227 if dst_if_index in cls._packet_count_for_dst_if_idx:
228 return cls._packet_count_for_dst_if_idx[dst_if_index]
234 """Return the instance of this testcase"""
235 return cls.test_instance
238 def set_debug_flags(cls, d):
239 cls.debug_core = False
240 cls.debug_gdb = False
241 cls.debug_gdbserver = False
246 cls.debug_core = True
249 elif dl == "gdbserver":
250 cls.debug_gdbserver = True
252 raise Exception("Unrecognized DEBUG option: '%s'" % d)
255 def get_least_used_cpu():
256 cpu_usage_list = [set(range(psutil.cpu_count()))]
257 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
258 if 'vpp_main' == p.info['name']]
259 for vpp_process in vpp_processes:
260 for cpu_usage_set in cpu_usage_list:
262 cpu_num = vpp_process.cpu_num()
263 if cpu_num in cpu_usage_set:
264 cpu_usage_set_index = cpu_usage_list.index(
266 if cpu_usage_set_index == len(cpu_usage_list) - 1:
267 cpu_usage_list.append({cpu_num})
269 cpu_usage_list[cpu_usage_set_index + 1].add(
271 cpu_usage_set.remove(cpu_num)
273 except psutil.NoSuchProcess:
276 for cpu_usage_set in cpu_usage_list:
277 if len(cpu_usage_set) > 0:
278 min_usage_set = cpu_usage_set
281 return random.choice(tuple(min_usage_set))
284 def setUpConstants(cls):
285 """ Set-up the test case class based on environment variables """
286 s = os.getenv("STEP", "n")
287 cls.step = True if s.lower() in ("y", "yes", "1") else False
288 d = os.getenv("DEBUG", None)
289 c = os.getenv("CACHE_OUTPUT", "1")
290 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
291 cls.set_debug_flags(d)
292 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
293 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
294 cls.test_plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
295 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
297 if cls.plugin_path is not None:
298 if cls.extern_plugin_path is not None:
299 plugin_path = "%s:%s" % (
300 cls.plugin_path, cls.extern_plugin_path)
302 plugin_path = cls.plugin_path
303 elif cls.extern_plugin_path is not None:
304 plugin_path = cls.extern_plugin_path
306 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
307 debug_cli = "cli-listen localhost:5002"
309 size = os.getenv("COREDUMP_SIZE")
311 coredump_size = "coredump-size %s" % size
312 if coredump_size is None:
313 coredump_size = "coredump-size unlimited"
315 cpu_core_number = cls.get_least_used_cpu()
317 cls.vpp_cmdline = [cls.vpp_bin, "unix",
318 "{", "nodaemon", debug_cli, "full-coredump",
319 coredump_size, "runtime-dir", cls.tempdir, "}",
320 "api-trace", "{", "on", "}", "api-segment", "{",
321 "prefix", cls.shm_prefix, "}", "cpu", "{",
322 "main-core", str(cpu_core_number), "}",
323 "statseg", "{", "socket-name", cls.stats_sock, "}",
324 "socksvr", "{", "socket-name", cls.api_sock, "}",
326 "{", "plugin", "dpdk_plugin.so", "{", "disable",
327 "}", "plugin", "rdma_plugin.so", "{", "disable",
328 "}", "plugin", "unittest_plugin.so", "{", "enable",
329 "}"] + cls.extra_vpp_plugin_config + ["}", ]
330 if cls.extra_vpp_punt_config is not None:
331 cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
332 if plugin_path is not None:
333 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
334 if cls.test_plugin_path is not None:
335 cls.vpp_cmdline.extend(["test_plugin_path", cls.test_plugin_path])
337 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
338 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
341 def wait_for_enter(cls):
342 if cls.debug_gdbserver:
343 print(double_line_delim)
344 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
346 print(double_line_delim)
347 print("Spawned VPP with PID: %d" % cls.vpp.pid)
349 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
351 print(single_line_delim)
352 print("You can debug the VPP using e.g.:")
353 if cls.debug_gdbserver:
354 print("sudo gdb " + cls.vpp_bin +
355 " -ex 'target remote localhost:7777'")
356 print("Now is the time to attach a gdb by running the above "
357 "command, set up breakpoints etc. and then resume VPP from "
358 "within gdb by issuing the 'continue' command")
360 print("sudo gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
361 print("Now is the time to attach a gdb by running the above "
362 "command and set up breakpoints etc.")
363 print(single_line_delim)
364 input("Press ENTER to continue running the testcase...")
368 cmdline = cls.vpp_cmdline
370 if cls.debug_gdbserver:
371 gdbserver = '/usr/bin/gdbserver'
372 if not os.path.isfile(gdbserver) or \
373 not os.access(gdbserver, os.X_OK):
374 raise Exception("gdbserver binary '%s' does not exist or is "
375 "not executable" % gdbserver)
377 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
378 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
381 cls.vpp = subprocess.Popen(cmdline,
382 stdout=subprocess.PIPE,
383 stderr=subprocess.PIPE,
385 except subprocess.CalledProcessError as e:
386 cls.logger.critical("Subprocess returned with non-0 return code: ("
390 cls.logger.critical("Subprocess returned with OS error: "
391 "(%s) %s", e.errno, e.strerror)
393 except Exception as e:
394 cls.logger.exception("Subprocess returned unexpected from "
401 def wait_for_stats_socket(cls):
402 deadline = time.time() + 3
404 while time.time() < deadline or \
405 cls.debug_gdb or cls.debug_gdbserver:
406 if os.path.exists(cls.stats_sock):
411 cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
414 def wait_for_coredump(cls):
415 corefile = cls.tempdir + "/core"
416 if os.path.isfile(corefile):
417 cls.logger.error("Waiting for coredump to complete: %s", corefile)
418 curr_size = os.path.getsize(corefile)
419 deadline = time.time() + 60
421 while time.time() < deadline:
424 curr_size = os.path.getsize(corefile)
425 if size == curr_size:
429 cls.logger.error("Timed out waiting for coredump to complete:"
432 cls.logger.error("Coredump complete: %s, size %d",
438 Perform class setup before running the testcase
439 Remove shared memory files, start vpp and connect the vpp-api
441 super(VppTestCase, cls).setUpClass()
442 gc.collect() # run garbage collection first
444 cls.logger = get_logger(cls.__name__)
445 if hasattr(cls, 'parallel_handler'):
446 cls.logger.addHandler(cls.parallel_handler)
447 cls.logger.propagate = False
449 cls.tempdir = tempfile.mkdtemp(
450 prefix='vpp-unittest-%s-' % cls.__name__)
451 cls.stats_sock = "%s/stats.sock" % cls.tempdir
452 cls.api_sock = "%s/api.sock" % cls.tempdir
453 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
454 cls.file_handler.setFormatter(
455 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
457 cls.file_handler.setLevel(DEBUG)
458 cls.logger.addHandler(cls.file_handler)
459 cls.logger.debug("--- setUpClass() for %s called ---" %
461 cls.shm_prefix = os.path.basename(cls.tempdir)
462 os.chdir(cls.tempdir)
463 cls.logger.info("Temporary dir is %s, shm prefix is %s",
464 cls.tempdir, cls.shm_prefix)
466 cls.reset_packet_infos()
468 cls._zombie_captures = []
471 cls.registry = VppObjectRegistry()
472 cls.vpp_startup_failed = False
473 cls.reporter = KeepAliveReporter()
474 # need to catch exceptions here because if we raise, then the cleanup
475 # doesn't get called and we might end with a zombie vpp
478 cls.reporter.send_keep_alive(cls, 'setUpClass')
479 VppTestResult.current_test_case_info = TestCaseInfo(
480 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
481 cls.vpp_stdout_deque = deque()
482 cls.vpp_stderr_deque = deque()
483 cls.pump_thread_stop_flag = Event()
484 cls.pump_thread_wakeup_pipe = os.pipe()
485 cls.pump_thread = Thread(target=pump_output, args=(cls,))
486 cls.pump_thread.daemon = True
487 cls.pump_thread.start()
488 if cls.debug_gdb or cls.debug_gdbserver:
492 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
498 cls.vapi.register_hook(hook)
499 cls.wait_for_stats_socket()
500 cls.statistics = VPPStats(socketname=cls.stats_sock)
504 cls.vpp_startup_failed = True
506 "VPP died shortly after startup, check the"
507 " output to standard error for possible cause")
513 cls.vapi.disconnect()
516 if cls.debug_gdbserver:
517 print(colorize("You're running VPP inside gdbserver but "
518 "VPP-API connection failed, did you forget "
519 "to 'continue' VPP from within gdb?", RED))
531 Disconnect vpp-api, kill vpp and cleanup shared memory files
533 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
535 if cls.vpp.returncode is None:
536 print(double_line_delim)
537 print("VPP or GDB server is still running")
538 print(single_line_delim)
539 input("When done debugging, press ENTER to kill the "
540 "process and finish running the testcase...")
542 # first signal that we want to stop the pump thread, then wake it up
543 if hasattr(cls, 'pump_thread_stop_flag'):
544 cls.pump_thread_stop_flag.set()
545 if hasattr(cls, 'pump_thread_wakeup_pipe'):
546 os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
547 if hasattr(cls, 'pump_thread'):
548 cls.logger.debug("Waiting for pump thread to stop")
549 cls.pump_thread.join()
550 if hasattr(cls, 'vpp_stderr_reader_thread'):
551 cls.logger.debug("Waiting for stdderr pump to stop")
552 cls.vpp_stderr_reader_thread.join()
554 if hasattr(cls, 'vpp'):
555 if hasattr(cls, 'vapi'):
556 cls.logger.debug("Disconnecting class vapi client on %s",
558 cls.vapi.disconnect()
559 cls.logger.debug("Deleting class vapi attribute on %s",
563 if cls.vpp.returncode is None:
564 cls.wait_for_coredump()
565 cls.logger.debug("Sending TERM to vpp")
567 cls.logger.debug("Waiting for vpp to die")
568 cls.vpp.communicate()
569 cls.logger.debug("Deleting class vpp attribute on %s",
573 if cls.vpp_startup_failed:
574 stdout_log = cls.logger.info
575 stderr_log = cls.logger.critical
577 stdout_log = cls.logger.info
578 stderr_log = cls.logger.info
580 if hasattr(cls, 'vpp_stdout_deque'):
581 stdout_log(single_line_delim)
582 stdout_log('VPP output to stdout while running %s:', cls.__name__)
583 stdout_log(single_line_delim)
584 vpp_output = "".join(cls.vpp_stdout_deque)
585 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
587 stdout_log('\n%s', vpp_output)
588 stdout_log(single_line_delim)
590 if hasattr(cls, 'vpp_stderr_deque'):
591 stderr_log(single_line_delim)
592 stderr_log('VPP output to stderr while running %s:', cls.__name__)
593 stderr_log(single_line_delim)
594 vpp_output = "".join(cls.vpp_stderr_deque)
595 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
597 stderr_log('\n%s', vpp_output)
598 stderr_log(single_line_delim)
601 def tearDownClass(cls):
602 """ Perform final cleanup after running all tests in this test-case """
603 cls.logger.debug("--- tearDownClass() for %s called ---" %
605 cls.reporter.send_keep_alive(cls, 'tearDownClass')
607 cls.file_handler.close()
608 cls.reset_packet_infos()
610 debug_internal.on_tear_down_class(cls)
612 def show_commands_at_teardown(self):
613 """ Allow subclass specific teardown logging additions."""
614 self.logger.info("--- No test specific show commands provided. ---")
617 """ Show various debug prints after each test """
618 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
619 (self.__class__.__name__, self._testMethodName,
620 self._testMethodDoc))
623 if not self.vpp_dead:
624 self.logger.debug(self.vapi.cli("show trace max 1000"))
625 self.logger.info(self.vapi.ppcli("show interface"))
626 self.logger.info(self.vapi.ppcli("show hardware"))
627 self.logger.info(self.statistics.set_errors_str())
628 self.logger.info(self.vapi.ppcli("show run"))
629 self.logger.info(self.vapi.ppcli("show log"))
630 self.logger.info("Logging testcase specific show commands.")
631 self.show_commands_at_teardown()
632 self.registry.remove_vpp_config(self.logger)
633 # Save/Dump VPP api trace log
634 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
635 tmp_api_trace = "/tmp/%s" % api_trace
636 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
637 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
638 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
640 os.rename(tmp_api_trace, vpp_api_trace_log)
641 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
643 except VppTransportShmemIOError:
644 self.logger.debug("VppTransportShmemIOError: Vpp dead. "
645 "Cannot log show commands.")
648 self.registry.unregister_all(self.logger)
651 """ Clear trace before running each test"""
652 super(VppTestCase, self).setUp()
653 self.reporter.send_keep_alive(self)
655 raise Exception("VPP is dead when setting up the test")
656 self.sleep(.1, "during setUp")
657 self.vpp_stdout_deque.append(
658 "--- test setUp() for %s.%s(%s) starts here ---\n" %
659 (self.__class__.__name__, self._testMethodName,
660 self._testMethodDoc))
661 self.vpp_stderr_deque.append(
662 "--- test setUp() for %s.%s(%s) starts here ---\n" %
663 (self.__class__.__name__, self._testMethodName,
664 self._testMethodDoc))
665 self.vapi.cli("clear trace")
666 # store the test instance inside the test class - so that objects
667 # holding the class can access instance methods (like assertEqual)
668 type(self).test_instance = self
671 def pg_enable_capture(cls, interfaces=None):
673 Enable capture on packet-generator interfaces
675 :param interfaces: iterable interface indexes (if None,
676 use self.pg_interfaces)
679 if interfaces is None:
680 interfaces = cls.pg_interfaces
685 def register_capture(cls, cap_name):
686 """ Register a capture in the testclass """
687 # add to the list of captures with current timestamp
688 cls._captures.append((time.time(), cap_name))
689 # filter out from zombies
690 cls._zombie_captures = [(stamp, name)
691 for (stamp, name) in cls._zombie_captures
696 """ Remove any zombie captures and enable the packet generator """
697 # how long before capture is allowed to be deleted - otherwise vpp
698 # crashes - 100ms seems enough (this shouldn't be needed at all)
701 for stamp, cap_name in cls._zombie_captures:
702 wait = stamp + capture_ttl - now
704 cls.sleep(wait, "before deleting capture %s" % cap_name)
706 cls.logger.debug("Removing zombie capture %s" % cap_name)
707 cls.vapi.cli('packet-generator delete %s' % cap_name)
709 cls.vapi.cli("trace add pg-input 1000")
710 cls.vapi.cli('packet-generator enable')
711 cls._zombie_captures = cls._captures
715 def create_pg_interfaces(cls, interfaces):
717 Create packet-generator interfaces.
719 :param interfaces: iterable indexes of the interfaces.
720 :returns: List of created interfaces.
725 intf = VppPGInterface(cls, i)
726 setattr(cls, intf.name, intf)
728 cls.pg_interfaces = result
732 def create_loopback_interfaces(cls, count):
734 Create loopback interfaces.
736 :param count: number of interfaces created.
737 :returns: List of created interfaces.
739 result = [VppLoInterface(cls) for i in range(count)]
741 setattr(cls, intf.name, intf)
742 cls.lo_interfaces = result
746 def create_bvi_interfaces(cls, count):
748 Create BVI interfaces.
750 :param count: number of interfaces created.
751 :returns: List of created interfaces.
753 result = [VppBviInterface(cls) for i in range(count)]
755 setattr(cls, intf.name, intf)
756 cls.bvi_interfaces = result
760 def extend_packet(packet, size, padding=' '):
762 Extend packet to given size by padding with spaces or custom padding
763 NOTE: Currently works only when Raw layer is present.
765 :param packet: packet
766 :param size: target size
767 :param padding: padding used to extend the payload
770 packet_len = len(packet) + 4
771 extend = size - packet_len
773 num = (extend // len(padding)) + 1
774 packet[Raw].load += (padding * num)[:extend].encode("ascii")
777 def reset_packet_infos(cls):
778 """ Reset the list of packet info objects and packet counts to zero """
779 cls._packet_infos = {}
780 cls._packet_count_for_dst_if_idx = {}
783 def create_packet_info(cls, src_if, dst_if):
785 Create packet info object containing the source and destination indexes
786 and add it to the testcase's packet info list
788 :param VppInterface src_if: source interface
789 :param VppInterface dst_if: destination interface
791 :returns: _PacketInfo object
795 info.index = len(cls._packet_infos)
796 info.src = src_if.sw_if_index
797 info.dst = dst_if.sw_if_index
798 if isinstance(dst_if, VppSubInterface):
799 dst_idx = dst_if.parent.sw_if_index
801 dst_idx = dst_if.sw_if_index
802 if dst_idx in cls._packet_count_for_dst_if_idx:
803 cls._packet_count_for_dst_if_idx[dst_idx] += 1
805 cls._packet_count_for_dst_if_idx[dst_idx] = 1
806 cls._packet_infos[info.index] = info
810 def info_to_payload(info):
812 Convert _PacketInfo object to packet payload
814 :param info: _PacketInfo object
816 :returns: string containing serialized data from packet info
818 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
822 def payload_to_info(payload, payload_field='load'):
824 Convert packet payload to _PacketInfo object
826 :param payload: packet payload
827 :type payload: <class 'scapy.packet.Raw'>
828 :param payload_field: packet fieldname of payload "load" for
829 <class 'scapy.packet.Raw'>
830 :type payload_field: str
831 :returns: _PacketInfo object containing de-serialized data from payload
834 numbers = getattr(payload, payload_field).split()
836 info.index = int(numbers[0])
837 info.src = int(numbers[1])
838 info.dst = int(numbers[2])
839 info.ip = int(numbers[3])
840 info.proto = int(numbers[4])
843 def get_next_packet_info(self, info):
845 Iterate over the packet info list stored in the testcase
846 Start iteration with first element if info is None
847 Continue based on index in info if info is specified
849 :param info: info or None
850 :returns: next info in list or None if no more infos
855 next_index = info.index + 1
856 if next_index == len(self._packet_infos):
859 return self._packet_infos[next_index]
861 def get_next_packet_info_for_interface(self, src_index, info):
863 Search the packet info list for the next packet info with same source
866 :param src_index: source interface index to search for
867 :param info: packet info - where to start the search
868 :returns: packet info or None
872 info = self.get_next_packet_info(info)
875 if info.src == src_index:
878 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
880 Search the packet info list for the next packet info with same source
881 and destination interface indexes
883 :param src_index: source interface index to search for
884 :param dst_index: destination interface index to search for
885 :param info: packet info - where to start the search
886 :returns: packet info or None
890 info = self.get_next_packet_info_for_interface(src_index, info)
893 if info.dst == dst_index:
896 def assert_equal(self, real_value, expected_value, name_or_class=None):
897 if name_or_class is None:
898 self.assertEqual(real_value, expected_value)
901 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
902 msg = msg % (getdoc(name_or_class).strip(),
903 real_value, str(name_or_class(real_value)),
904 expected_value, str(name_or_class(expected_value)))
906 msg = "Invalid %s: %s does not match expected value %s" % (
907 name_or_class, real_value, expected_value)
909 self.assertEqual(real_value, expected_value, msg)
911 def assert_in_range(self,
919 msg = "Invalid %s: %s out of range <%s,%s>" % (
920 name, real_value, expected_min, expected_max)
921 self.assertTrue(expected_min <= real_value <= expected_max, msg)
923 def assert_packet_checksums_valid(self, packet,
924 ignore_zero_udp_checksums=True):
925 received = packet.__class__(scapy.compat.raw(packet))
927 ppp("Verifying packet checksums for packet:", received))
928 udp_layers = ['UDP', 'UDPerror']
929 checksum_fields = ['cksum', 'chksum']
932 temp = received.__class__(scapy.compat.raw(received))
934 layer = temp.getlayer(counter)
936 for cf in checksum_fields:
937 if hasattr(layer, cf):
938 if ignore_zero_udp_checksums and \
939 0 == getattr(layer, cf) and \
940 layer.name in udp_layers:
943 checksums.append((counter, cf))
946 counter = counter + 1
947 if 0 == len(checksums):
949 temp = temp.__class__(scapy.compat.raw(temp))
950 for layer, cf in checksums:
951 calc_sum = getattr(temp[layer], cf)
953 getattr(received[layer], cf), calc_sum,
954 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
956 "Checksum field `%s` on `%s` layer has correct value `%s`" %
957 (cf, temp[layer].name, calc_sum))
959 def assert_checksum_valid(self, received_packet, layer,
961 ignore_zero_checksum=False):
962 """ Check checksum of received packet on given layer """
963 received_packet_checksum = getattr(received_packet[layer], field_name)
964 if ignore_zero_checksum and 0 == received_packet_checksum:
966 recalculated = received_packet.__class__(
967 scapy.compat.raw(received_packet))
968 delattr(recalculated[layer], field_name)
969 recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
970 self.assert_equal(received_packet_checksum,
971 getattr(recalculated[layer], field_name),
972 "packet checksum on layer: %s" % layer)
974 def assert_ip_checksum_valid(self, received_packet,
975 ignore_zero_checksum=False):
976 self.assert_checksum_valid(received_packet, 'IP',
977 ignore_zero_checksum=ignore_zero_checksum)
979 def assert_tcp_checksum_valid(self, received_packet,
980 ignore_zero_checksum=False):
981 self.assert_checksum_valid(received_packet, 'TCP',
982 ignore_zero_checksum=ignore_zero_checksum)
984 def assert_udp_checksum_valid(self, received_packet,
985 ignore_zero_checksum=True):
986 self.assert_checksum_valid(received_packet, 'UDP',
987 ignore_zero_checksum=ignore_zero_checksum)
989 def assert_embedded_icmp_checksum_valid(self, received_packet):
990 if received_packet.haslayer(IPerror):
991 self.assert_checksum_valid(received_packet, 'IPerror')
992 if received_packet.haslayer(TCPerror):
993 self.assert_checksum_valid(received_packet, 'TCPerror')
994 if received_packet.haslayer(UDPerror):
995 self.assert_checksum_valid(received_packet, 'UDPerror',
996 ignore_zero_checksum=True)
997 if received_packet.haslayer(ICMPerror):
998 self.assert_checksum_valid(received_packet, 'ICMPerror')
1000 def assert_icmp_checksum_valid(self, received_packet):
1001 self.assert_checksum_valid(received_packet, 'ICMP')
1002 self.assert_embedded_icmp_checksum_valid(received_packet)
1004 def assert_icmpv6_checksum_valid(self, pkt):
1005 if pkt.haslayer(ICMPv6DestUnreach):
1006 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
1007 self.assert_embedded_icmp_checksum_valid(pkt)
1008 if pkt.haslayer(ICMPv6EchoRequest):
1009 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
1010 if pkt.haslayer(ICMPv6EchoReply):
1011 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
1013 def get_packet_counter(self, counter):
1014 if counter.startswith("/"):
1015 counter_value = self.statistics.get_counter(counter)
1017 counters = self.vapi.cli("sh errors").split('\n')
1019 for i in range(1, len(counters) - 1):
1020 results = counters[i].split()
1021 if results[1] == counter:
1022 counter_value = int(results[0])
1024 return counter_value
1026 def assert_packet_counter_equal(self, counter, expected_value):
1027 counter_value = self.get_packet_counter(counter)
1028 self.assert_equal(counter_value, expected_value,
1029 "packet counter `%s'" % counter)
1031 def assert_error_counter_equal(self, counter, expected_value):
1032 counter_value = self.statistics.get_err_counter(counter)
1033 self.assert_equal(counter_value, expected_value,
1034 "error counter `%s'" % counter)
1037 def sleep(cls, timeout, remark=None):
1039 # /* Allow sleep(0) to maintain win32 semantics, and as decreed
1040 # * by Guido, only the main thread can be interrupted.
1042 # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
1045 if hasattr(os, 'sched_yield'):
1051 if hasattr(cls, 'logger'):
1052 cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
1053 before = time.time()
1056 if hasattr(cls, 'logger') and after - before > 2 * timeout:
1057 cls.logger.error("unexpected self.sleep() result - "
1058 "slept for %es instead of ~%es!",
1059 after - before, timeout)
1060 if hasattr(cls, 'logger'):
1062 "Finished sleep (%s) - slept %es (wanted %es)",
1063 remark, after - before, timeout)
1065 def pg_send(self, intf, pkts):
1066 self.vapi.cli("clear trace")
1067 intf.add_stream(pkts)
1068 self.pg_enable_capture(self.pg_interfaces)
1071 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
1072 self.pg_send(intf, pkts)
1075 for i in self.pg_interfaces:
1076 i.get_capture(0, timeout=timeout)
1077 i.assert_nothing_captured(remark=remark)
1080 def send_and_expect(self, intf, pkts, output, n_rx=None):
1083 self.pg_send(intf, pkts)
1084 rx = output.get_capture(n_rx)
1087 def send_and_expect_only(self, intf, pkts, output, timeout=None):
1088 self.pg_send(intf, pkts)
1089 rx = output.get_capture(len(pkts))
1093 for i in self.pg_interfaces:
1094 if i not in outputs:
1095 i.get_capture(0, timeout=timeout)
1096 i.assert_nothing_captured()
1102 """ unittest calls runTest when TestCase is instantiated without a
1103 test case. Use case: Writing unittests against VppTestCase"""
1107 def get_testcase_doc_name(test):
1108 return getdoc(test.__class__).splitlines()[0]
1111 def get_test_description(descriptions, test):
1112 short_description = test.shortDescription()
1113 if descriptions and short_description:
1114 return short_description
1119 class TestCaseInfo(object):
1120 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1121 self.logger = logger
1122 self.tempdir = tempdir
1123 self.vpp_pid = vpp_pid
1124 self.vpp_bin_path = vpp_bin_path
1125 self.core_crash_test = None
1128 class VppTestResult(unittest.TestResult):
1130 @property result_string
1131 String variable to store the test case result string.
1133 List variable containing 2-tuples of TestCase instances and strings
1134 holding formatted tracebacks. Each tuple represents a test which
1135 raised an unexpected exception.
1137 List variable containing 2-tuples of TestCase instances and strings
1138 holding formatted tracebacks. Each tuple represents a test where
1139 a failure was explicitly signalled using the TestCase.assert*()
1143 failed_test_cases_info = set()
1144 core_crash_test_cases_info = set()
1145 current_test_case_info = None
1147 def __init__(self, stream=None, descriptions=None, verbosity=None,
1150 :param stream File descriptor to store where to report test results.
1151 Set to the standard error stream by default.
1152 :param descriptions Boolean variable to store information if to use
1153 test case descriptions.
1154 :param verbosity Integer variable to store required verbosity level.
1156 super(VppTestResult, self).__init__(stream, descriptions, verbosity)
1157 self.stream = stream
1158 self.descriptions = descriptions
1159 self.verbosity = verbosity
1160 self.result_string = None
1161 self.runner = runner
1163 def addSuccess(self, test):
1165 Record a test succeeded result
1170 if self.current_test_case_info:
1171 self.current_test_case_info.logger.debug(
1172 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1173 test._testMethodName,
1174 test._testMethodDoc))
1175 unittest.TestResult.addSuccess(self, test)
1176 self.result_string = colorize("OK", GREEN)
1178 self.send_result_through_pipe(test, PASS)
1180 def addSkip(self, test, reason):
1182 Record a test skipped.
1188 if self.current_test_case_info:
1189 self.current_test_case_info.logger.debug(
1190 "--- addSkip() %s.%s(%s) called, reason is %s" %
1191 (test.__class__.__name__, test._testMethodName,
1192 test._testMethodDoc, reason))
1193 unittest.TestResult.addSkip(self, test, reason)
1194 self.result_string = colorize("SKIP", YELLOW)
1196 self.send_result_through_pipe(test, SKIP)
1198 def symlink_failed(self):
1199 if self.current_test_case_info:
1201 failed_dir = os.getenv('FAILED_DIR')
1202 link_path = os.path.join(
1205 os.path.basename(self.current_test_case_info.tempdir))
1206 if self.current_test_case_info.logger:
1207 self.current_test_case_info.logger.debug(
1208 "creating a link to the failed test")
1209 self.current_test_case_info.logger.debug(
1210 "os.symlink(%s, %s)" %
1211 (self.current_test_case_info.tempdir, link_path))
1212 if os.path.exists(link_path):
1213 if self.current_test_case_info.logger:
1214 self.current_test_case_info.logger.debug(
1215 'symlink already exists')
1217 os.symlink(self.current_test_case_info.tempdir, link_path)
1219 except Exception as e:
1220 if self.current_test_case_info.logger:
1221 self.current_test_case_info.logger.error(e)
1223 def send_result_through_pipe(self, test, result):
1224 if hasattr(self, 'test_framework_result_pipe'):
1225 pipe = self.test_framework_result_pipe
1227 pipe.send((test.id(), result))
1229 def log_error(self, test, err, fn_name):
1230 if self.current_test_case_info:
1231 if isinstance(test, unittest.suite._ErrorHolder):
1232 test_name = test.description
1234 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1235 test._testMethodName,
1236 test._testMethodDoc)
1237 self.current_test_case_info.logger.debug(
1238 "--- %s() %s called, err is %s" %
1239 (fn_name, test_name, err))
1240 self.current_test_case_info.logger.debug(
1241 "formatted exception is:\n%s" %
1242 "".join(format_exception(*err)))
1244 def add_error(self, test, err, unittest_fn, error_type):
1245 if error_type == FAIL:
1246 self.log_error(test, err, 'addFailure')
1247 error_type_str = colorize("FAIL", RED)
1248 elif error_type == ERROR:
1249 self.log_error(test, err, 'addError')
1250 error_type_str = colorize("ERROR", RED)
1252 raise Exception('Error type %s cannot be used to record an '
1253 'error or a failure' % error_type)
1255 unittest_fn(self, test, err)
1256 if self.current_test_case_info:
1257 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1259 self.current_test_case_info.tempdir)
1260 self.symlink_failed()
1261 self.failed_test_cases_info.add(self.current_test_case_info)
1262 if is_core_present(self.current_test_case_info.tempdir):
1263 if not self.current_test_case_info.core_crash_test:
1264 if isinstance(test, unittest.suite._ErrorHolder):
1265 test_name = str(test)
1267 test_name = "'{!s}' ({!s})".format(
1268 get_testcase_doc_name(test), test.id())
1269 self.current_test_case_info.core_crash_test = test_name
1270 self.core_crash_test_cases_info.add(
1271 self.current_test_case_info)
1273 self.result_string = '%s [no temp dir]' % error_type_str
1275 self.send_result_through_pipe(test, error_type)
1277 def addFailure(self, test, err):
1279 Record a test failed result
1282 :param err: error message
1285 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1287 def addError(self, test, err):
1289 Record a test error result
1292 :param err: error message
1295 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1297 def getDescription(self, test):
1299 Get test description
1302 :returns: test description
1305 return get_test_description(self.descriptions, test)
1307 def startTest(self, test):
1315 def print_header(test):
1316 if not hasattr(test.__class__, '_header_printed'):
1317 print(double_line_delim)
1318 print(colorize(getdoc(test).splitlines()[0], GREEN))
1319 print(double_line_delim)
1320 test.__class__._header_printed = True
1324 unittest.TestResult.startTest(self, test)
1325 if self.verbosity > 0:
1326 self.stream.writeln(
1327 "Starting " + self.getDescription(test) + " ...")
1328 self.stream.writeln(single_line_delim)
1330 def stopTest(self, test):
1332 Called when the given test has been run
1337 unittest.TestResult.stopTest(self, test)
1338 if self.verbosity > 0:
1339 self.stream.writeln(single_line_delim)
1340 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1341 self.result_string))
1342 self.stream.writeln(single_line_delim)
1344 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1345 self.result_string))
1347 self.send_result_through_pipe(test, TEST_RUN)
1349 def printErrors(self):
1351 Print errors from running the test case
1353 if len(self.errors) > 0 or len(self.failures) > 0:
1354 self.stream.writeln()
1355 self.printErrorList('ERROR', self.errors)
1356 self.printErrorList('FAIL', self.failures)
1358 # ^^ that is the last output from unittest before summary
1359 if not self.runner.print_summary:
1360 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1361 self.stream = devnull
1362 self.runner.stream = devnull
1364 def printErrorList(self, flavour, errors):
1366 Print error list to the output stream together with error type
1367 and test case description.
1369 :param flavour: error type
1370 :param errors: iterable errors
1373 for test, err in errors:
1374 self.stream.writeln(double_line_delim)
1375 self.stream.writeln("%s: %s" %
1376 (flavour, self.getDescription(test)))
1377 self.stream.writeln(single_line_delim)
1378 self.stream.writeln("%s" % err)
1381 class VppTestRunner(unittest.TextTestRunner):
1383 A basic test runner implementation which prints results to standard error.
1387 def resultclass(self):
1388 """Class maintaining the results of the tests"""
1389 return VppTestResult
1391 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1392 result_pipe=None, failfast=False, buffer=False,
1393 resultclass=None, print_summary=True, **kwargs):
1394 # ignore stream setting here, use hard-coded stdout to be in sync
1395 # with prints from VppTestCase methods ...
1396 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1397 verbosity, failfast, buffer,
1398 resultclass, **kwargs)
1399 KeepAliveReporter.pipe = keep_alive_pipe
1401 self.orig_stream = self.stream
1402 self.resultclass.test_framework_result_pipe = result_pipe
1404 self.print_summary = print_summary
1406 def _makeResult(self):
1407 return self.resultclass(self.stream,
1412 def run(self, test):
1419 faulthandler.enable() # emit stack trace to stderr if killed by signal
1421 result = super(VppTestRunner, self).run(test)
1422 if not self.print_summary:
1423 self.stream = self.orig_stream
1424 result.stream = self.orig_stream
1428 class Worker(Thread):
1429 def __init__(self, args, logger, env={}):
1430 self.logger = logger
1433 self.env = copy.deepcopy(env)
1434 super(Worker, self).__init__()
1437 executable = self.args[0]
1438 self.logger.debug("Running executable w/args `%s'" % self.args)
1439 env = os.environ.copy()
1440 env.update(self.env)
1441 env["CK_LOG_FILE_NAME"] = "-"
1442 self.process = subprocess.Popen(
1443 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1444 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1445 out, err = self.process.communicate()
1446 self.logger.debug("Finished running `%s'" % executable)
1447 self.logger.info("Return code is `%s'" % self.process.returncode)
1448 self.logger.info(single_line_delim)
1449 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1450 self.logger.info(single_line_delim)
1451 self.logger.info(out)
1452 self.logger.info(single_line_delim)
1453 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1454 self.logger.info(single_line_delim)
1455 self.logger.info(err)
1456 self.logger.info(single_line_delim)
1457 self.result = self.process.returncode
1460 if __name__ == '__main__':