3 from __future__ import print_function
15 from collections import deque
16 from threading import Thread, Event
17 from inspect import getdoc, isclass
18 from traceback import format_exception
19 from logging import FileHandler, DEBUG, Formatter
20 from scapy.packet import Raw
21 from hook import StepHook, PollHook, VppDiedError
22 from vpp_pg_interface import VppPGInterface
23 from vpp_sub_interface import VppSubInterface
24 from vpp_lo_interface import VppLoInterface
25 from vpp_papi_provider import VppPapiProvider
26 from vpp_papi.vpp_stats import VPPStats
27 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
29 from vpp_object import VppObjectRegistry
30 from util import ppp, is_core_present
31 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
32 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
33 from scapy.layers.inet6 import ICMPv6EchoReply
34 if os.name == 'posix' and sys.version_info[0] < 3:
35 # using subprocess32 is recommended by python official documentation
36 # @ https://docs.python.org/2/library/subprocess.html
37 import subprocess32 as subprocess
49 debug_framework = False
50 if os.getenv('TEST_DEBUG', "0") == "1":
51 debug_framework = True
56 Test framework module.
58 The module provides a set of tools for constructing and running tests and
59 representing the results.
63 class _PacketInfo(object):
64 """Private class to create packet info object.
66 Help process information about the next packet.
67 Set variables to default values.
69 #: Store the index of the packet.
71 #: Store the index of the source packet generator interface of the packet.
73 #: Store the index of the destination packet generator interface
76 #: Store expected ip version
78 #: Store expected upper protocol
80 #: Store the copy of the former packet.
83 def __eq__(self, other):
84 index = self.index == other.index
85 src = self.src == other.src
86 dst = self.dst == other.dst
87 data = self.data == other.data
88 return index and src and dst and data
91 def pump_output(testclass):
92 """ pump output from vpp stdout/stderr to proper queues """
95 while not testclass.pump_thread_stop_flag.is_set():
96 readable = select.select([testclass.vpp.stdout.fileno(),
97 testclass.vpp.stderr.fileno(),
98 testclass.pump_thread_wakeup_pipe[0]],
100 if testclass.vpp.stdout.fileno() in readable:
101 read = os.read(testclass.vpp.stdout.fileno(), 102400)
103 split = read.splitlines(True)
104 if len(stdout_fragment) > 0:
105 split[0] = "%s%s" % (stdout_fragment, split[0])
106 if len(split) > 0 and split[-1].endswith("\n"):
110 stdout_fragment = split[-1]
111 testclass.vpp_stdout_deque.extend(split[:limit])
112 if not testclass.cache_vpp_output:
113 for line in split[:limit]:
114 testclass.logger.debug(
115 "VPP STDOUT: %s" % line.rstrip("\n"))
116 if testclass.vpp.stderr.fileno() in readable:
117 read = os.read(testclass.vpp.stderr.fileno(), 102400)
119 split = read.splitlines(True)
120 if len(stderr_fragment) > 0:
121 split[0] = "%s%s" % (stderr_fragment, split[0])
122 if len(split) > 0 and split[-1].endswith("\n"):
126 stderr_fragment = split[-1]
127 testclass.vpp_stderr_deque.extend(split[:limit])
128 if not testclass.cache_vpp_output:
129 for line in split[:limit]:
130 testclass.logger.debug(
131 "VPP STDERR: %s" % line.rstrip("\n"))
132 # ignoring the dummy pipe here intentionally - the flag will take care
133 # of properly terminating the loop
136 def running_extended_tests():
137 s = os.getenv("EXTENDED_TESTS", "n")
138 return True if s.lower() in ("y", "yes", "1") else False
141 def running_on_centos():
142 os_id = os.getenv("OS_ID", "")
143 return True if "centos" in os_id.lower() else False
146 class KeepAliveReporter(object):
148 Singleton object which reports test start to parent process
153 self.__dict__ = self._shared_state
160 def pipe(self, pipe):
161 if hasattr(self, '_pipe'):
162 raise Exception("Internal error - pipe should only be set once.")
165 def send_keep_alive(self, test, desc=None):
167 Write current test tmpdir & desc to keep-alive pipe to signal liveness
169 if self.pipe is None:
170 # if not running forked..
174 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
178 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
181 class VppTestCase(unittest.TestCase):
182 """This subclass is a base class for VPP test cases that are implemented as
183 classes. It provides methods to create and run test case.
187 def packet_infos(self):
188 """List of packet infos"""
189 return self._packet_infos
192 def get_packet_count_for_if_idx(cls, dst_if_index):
193 """Get the number of packet info for specified destination if index"""
194 if dst_if_index in cls._packet_count_for_dst_if_idx:
195 return cls._packet_count_for_dst_if_idx[dst_if_index]
201 """Return the instance of this testcase"""
202 return cls.test_instance
205 def set_debug_flags(cls, d):
206 cls.debug_core = False
207 cls.debug_gdb = False
208 cls.debug_gdbserver = False
213 cls.debug_core = True
216 elif dl == "gdbserver":
217 cls.debug_gdbserver = True
219 raise Exception("Unrecognized DEBUG option: '%s'" % d)
222 def get_least_used_cpu(self):
223 cpu_usage_list = [set(range(psutil.cpu_count()))]
224 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
225 if 'vpp_main' == p.info['name']]
226 for vpp_process in vpp_processes:
227 for cpu_usage_set in cpu_usage_list:
229 cpu_num = vpp_process.cpu_num()
230 if cpu_num in cpu_usage_set:
231 cpu_usage_set_index = cpu_usage_list.index(
233 if cpu_usage_set_index == len(cpu_usage_list) - 1:
234 cpu_usage_list.append({cpu_num})
236 cpu_usage_list[cpu_usage_set_index + 1].add(
238 cpu_usage_set.remove(cpu_num)
240 except psutil.NoSuchProcess:
243 for cpu_usage_set in cpu_usage_list:
244 if len(cpu_usage_set) > 0:
245 min_usage_set = cpu_usage_set
248 return random.choice(tuple(min_usage_set))
251 def print_header(cls):
252 if not hasattr(cls, '_header_printed'):
253 print(double_line_delim)
254 print(colorize(getdoc(cls).splitlines()[0], GREEN))
255 print(double_line_delim)
256 cls._header_printed = True
259 def setUpConstants(cls):
260 """ Set-up the test case class based on environment variables """
261 s = os.getenv("STEP", "n")
262 cls.step = True if s.lower() in ("y", "yes", "1") else False
263 d = os.getenv("DEBUG", None)
264 c = os.getenv("CACHE_OUTPUT", "1")
265 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
266 cls.set_debug_flags(d)
267 cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
268 cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
269 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
271 if cls.plugin_path is not None:
272 if cls.extern_plugin_path is not None:
273 plugin_path = "%s:%s" % (
274 cls.plugin_path, cls.extern_plugin_path)
276 plugin_path = cls.plugin_path
277 elif cls.extern_plugin_path is not None:
278 plugin_path = cls.extern_plugin_path
280 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
281 debug_cli = "cli-listen localhost:5002"
283 size = os.getenv("COREDUMP_SIZE")
285 coredump_size = "coredump-size %s" % size
286 if coredump_size is None:
287 coredump_size = "coredump-size unlimited"
289 cpu_core_number = cls.get_least_used_cpu()
291 cls.vpp_cmdline = [cls.vpp_bin, "unix",
292 "{", "nodaemon", debug_cli, "full-coredump",
293 coredump_size, "runtime-dir", cls.tempdir, "}",
294 "api-trace", "{", "on", "}", "api-segment", "{",
295 "prefix", cls.shm_prefix, "}", "cpu", "{",
296 "main-core", str(cpu_core_number), "}", "statseg",
297 "{", "socket-name", cls.stats_sock, "}", "plugins",
298 "{", "plugin", "dpdk_plugin.so", "{", "disable",
299 "}", "plugin", "unittest_plugin.so", "{", "enable",
301 if plugin_path is not None:
302 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
303 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
304 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
307 def wait_for_enter(cls):
308 if cls.debug_gdbserver:
309 print(double_line_delim)
310 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
312 print(double_line_delim)
313 print("Spawned VPP with PID: %d" % cls.vpp.pid)
315 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
317 print(single_line_delim)
318 print("You can debug the VPP using e.g.:")
319 if cls.debug_gdbserver:
320 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
321 print("Now is the time to attach a gdb by running the above "
322 "command, set up breakpoints etc. and then resume VPP from "
323 "within gdb by issuing the 'continue' command")
325 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
326 print("Now is the time to attach a gdb by running the above "
327 "command and set up breakpoints etc.")
328 print(single_line_delim)
329 raw_input("Press ENTER to continue running the testcase...")
333 cmdline = cls.vpp_cmdline
335 if cls.debug_gdbserver:
336 gdbserver = '/usr/bin/gdbserver'
337 if not os.path.isfile(gdbserver) or \
338 not os.access(gdbserver, os.X_OK):
339 raise Exception("gdbserver binary '%s' does not exist or is "
340 "not executable" % gdbserver)
342 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
343 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
346 cls.vpp = subprocess.Popen(cmdline,
347 stdout=subprocess.PIPE,
348 stderr=subprocess.PIPE,
350 except Exception as e:
351 cls.logger.critical("Couldn't start vpp: %s" % e)
357 def wait_for_stats_socket(cls):
358 deadline = time.time() + 3
360 while time.time() < deadline or \
361 cls.debug_gdb or cls.debug_gdbserver:
362 if os.path.exists(cls.stats_sock):
367 cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
372 Perform class setup before running the testcase
373 Remove shared memory files, start vpp and connect the vpp-api
375 gc.collect() # run garbage collection first
377 cls.print_header(cls)
378 cls.logger = get_logger(cls.__name__)
379 if hasattr(cls, 'parallel_handler'):
380 cls.logger.addHandler(cls.parallel_handler)
381 cls.tempdir = tempfile.mkdtemp(
382 prefix='vpp-unittest-%s-' % cls.__name__)
383 cls.stats_sock = "%s/stats.sock" % cls.tempdir
384 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
385 cls.file_handler.setFormatter(
386 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
388 cls.file_handler.setLevel(DEBUG)
389 cls.logger.addHandler(cls.file_handler)
390 cls.shm_prefix = os.path.basename(cls.tempdir)
391 os.chdir(cls.tempdir)
392 cls.logger.info("Temporary dir is %s, shm prefix is %s",
393 cls.tempdir, cls.shm_prefix)
395 cls.reset_packet_infos()
397 cls._zombie_captures = []
400 cls.registry = VppObjectRegistry()
401 cls.vpp_startup_failed = False
402 cls.reporter = KeepAliveReporter()
403 # need to catch exceptions here because if we raise, then the cleanup
404 # doesn't get called and we might end with a zombie vpp
407 cls.reporter.send_keep_alive(cls, 'setUpClass')
408 VppTestResult.current_test_case_info = TestCaseInfo(
409 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
410 cls.vpp_stdout_deque = deque()
411 cls.vpp_stderr_deque = deque()
412 cls.pump_thread_stop_flag = Event()
413 cls.pump_thread_wakeup_pipe = os.pipe()
414 cls.pump_thread = Thread(target=pump_output, args=(cls,))
415 cls.pump_thread.daemon = True
416 cls.pump_thread.start()
417 if cls.debug_gdb or cls.debug_gdbserver:
421 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
427 cls.vapi.register_hook(hook)
428 cls.wait_for_stats_socket()
429 cls.statistics = VPPStats(socketname=cls.stats_sock)
433 cls.vpp_startup_failed = True
435 "VPP died shortly after startup, check the"
436 " output to standard error for possible cause")
442 cls.vapi.disconnect()
445 if cls.debug_gdbserver:
446 print(colorize("You're running VPP inside gdbserver but "
447 "VPP-API connection failed, did you forget "
448 "to 'continue' VPP from within gdb?", RED))
460 Disconnect vpp-api, kill vpp and cleanup shared memory files
462 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
464 if cls.vpp.returncode is None:
465 print(double_line_delim)
466 print("VPP or GDB server is still running")
467 print(single_line_delim)
468 raw_input("When done debugging, press ENTER to kill the "
469 "process and finish running the testcase...")
471 # first signal that we want to stop the pump thread, then wake it up
472 if hasattr(cls, 'pump_thread_stop_flag'):
473 cls.pump_thread_stop_flag.set()
474 if hasattr(cls, 'pump_thread_wakeup_pipe'):
475 os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
476 if hasattr(cls, 'pump_thread'):
477 cls.logger.debug("Waiting for pump thread to stop")
478 cls.pump_thread.join()
479 if hasattr(cls, 'vpp_stderr_reader_thread'):
480 cls.logger.debug("Waiting for stdderr pump to stop")
481 cls.vpp_stderr_reader_thread.join()
483 if hasattr(cls, 'vpp'):
484 if hasattr(cls, 'vapi'):
485 cls.vapi.disconnect()
488 if cls.vpp.returncode is None:
489 cls.logger.debug("Sending TERM to vpp")
491 cls.logger.debug("Waiting for vpp to die")
492 cls.vpp.communicate()
495 if cls.vpp_startup_failed:
496 stdout_log = cls.logger.info
497 stderr_log = cls.logger.critical
499 stdout_log = cls.logger.info
500 stderr_log = cls.logger.info
502 if hasattr(cls, 'vpp_stdout_deque'):
503 stdout_log(single_line_delim)
504 stdout_log('VPP output to stdout while running %s:', cls.__name__)
505 stdout_log(single_line_delim)
506 vpp_output = "".join(cls.vpp_stdout_deque)
507 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
509 stdout_log('\n%s', vpp_output)
510 stdout_log(single_line_delim)
512 if hasattr(cls, 'vpp_stderr_deque'):
513 stderr_log(single_line_delim)
514 stderr_log('VPP output to stderr while running %s:', cls.__name__)
515 stderr_log(single_line_delim)
516 vpp_output = "".join(cls.vpp_stderr_deque)
517 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
519 stderr_log('\n%s', vpp_output)
520 stderr_log(single_line_delim)
523 def tearDownClass(cls):
524 """ Perform final cleanup after running all tests in this test-case """
525 cls.reporter.send_keep_alive(cls, 'tearDownClass')
527 cls.file_handler.close()
528 cls.reset_packet_infos()
530 debug_internal.on_tear_down_class(cls)
533 """ Show various debug prints after each test """
534 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
535 (self.__class__.__name__, self._testMethodName,
536 self._testMethodDoc))
537 if not self.vpp_dead:
538 self.logger.debug(self.vapi.cli("show trace"))
539 self.logger.info(self.vapi.ppcli("show interface"))
540 self.logger.info(self.vapi.ppcli("show hardware"))
541 self.logger.info(self.statistics.set_errors_str())
542 self.logger.info(self.vapi.ppcli("show run"))
543 self.logger.info(self.vapi.ppcli("show log"))
544 self.registry.remove_vpp_config(self.logger)
545 # Save/Dump VPP api trace log
546 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
547 tmp_api_trace = "/tmp/%s" % api_trace
548 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
549 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
550 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
552 os.rename(tmp_api_trace, vpp_api_trace_log)
553 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
556 self.registry.unregister_all(self.logger)
559 """ Clear trace before running each test"""
560 self.reporter.send_keep_alive(self)
561 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
562 (self.__class__.__name__, self._testMethodName,
563 self._testMethodDoc))
565 raise Exception("VPP is dead when setting up the test")
566 self.sleep(.1, "during setUp")
567 self.vpp_stdout_deque.append(
568 "--- test setUp() for %s.%s(%s) starts here ---\n" %
569 (self.__class__.__name__, self._testMethodName,
570 self._testMethodDoc))
571 self.vpp_stderr_deque.append(
572 "--- test setUp() for %s.%s(%s) starts here ---\n" %
573 (self.__class__.__name__, self._testMethodName,
574 self._testMethodDoc))
575 self.vapi.cli("clear trace")
576 # store the test instance inside the test class - so that objects
577 # holding the class can access instance methods (like assertEqual)
578 type(self).test_instance = self
581 def pg_enable_capture(cls, interfaces=None):
583 Enable capture on packet-generator interfaces
585 :param interfaces: iterable interface indexes (if None,
586 use self.pg_interfaces)
589 if interfaces is None:
590 interfaces = cls.pg_interfaces
595 def register_capture(cls, cap_name):
596 """ Register a capture in the testclass """
597 # add to the list of captures with current timestamp
598 cls._captures.append((time.time(), cap_name))
599 # filter out from zombies
600 cls._zombie_captures = [(stamp, name)
601 for (stamp, name) in cls._zombie_captures
606 """ Remove any zombie captures and enable the packet generator """
607 # how long before capture is allowed to be deleted - otherwise vpp
608 # crashes - 100ms seems enough (this shouldn't be needed at all)
611 for stamp, cap_name in cls._zombie_captures:
612 wait = stamp + capture_ttl - now
614 cls.sleep(wait, "before deleting capture %s" % cap_name)
616 cls.logger.debug("Removing zombie capture %s" % cap_name)
617 cls.vapi.cli('packet-generator delete %s' % cap_name)
619 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
620 cls.vapi.cli('packet-generator enable')
621 cls._zombie_captures = cls._captures
625 def create_pg_interfaces(cls, interfaces):
627 Create packet-generator interfaces.
629 :param interfaces: iterable indexes of the interfaces.
630 :returns: List of created interfaces.
635 intf = VppPGInterface(cls, i)
636 setattr(cls, intf.name, intf)
638 cls.pg_interfaces = result
642 def create_loopback_interfaces(cls, count):
644 Create loopback interfaces.
646 :param count: number of interfaces created.
647 :returns: List of created interfaces.
649 result = [VppLoInterface(cls) for i in range(count)]
651 setattr(cls, intf.name, intf)
652 cls.lo_interfaces = result
656 def extend_packet(packet, size, padding=' '):
658 Extend packet to given size by padding with spaces or custom padding
659 NOTE: Currently works only when Raw layer is present.
661 :param packet: packet
662 :param size: target size
663 :param padding: padding used to extend the payload
666 packet_len = len(packet) + 4
667 extend = size - packet_len
669 num = (extend / len(padding)) + 1
670 packet[Raw].load += (padding * num)[:extend]
673 def reset_packet_infos(cls):
674 """ Reset the list of packet info objects and packet counts to zero """
675 cls._packet_infos = {}
676 cls._packet_count_for_dst_if_idx = {}
679 def create_packet_info(cls, src_if, dst_if):
681 Create packet info object containing the source and destination indexes
682 and add it to the testcase's packet info list
684 :param VppInterface src_if: source interface
685 :param VppInterface dst_if: destination interface
687 :returns: _PacketInfo object
691 info.index = len(cls._packet_infos)
692 info.src = src_if.sw_if_index
693 info.dst = dst_if.sw_if_index
694 if isinstance(dst_if, VppSubInterface):
695 dst_idx = dst_if.parent.sw_if_index
697 dst_idx = dst_if.sw_if_index
698 if dst_idx in cls._packet_count_for_dst_if_idx:
699 cls._packet_count_for_dst_if_idx[dst_idx] += 1
701 cls._packet_count_for_dst_if_idx[dst_idx] = 1
702 cls._packet_infos[info.index] = info
706 def info_to_payload(info):
708 Convert _PacketInfo object to packet payload
710 :param info: _PacketInfo object
712 :returns: string containing serialized data from packet info
714 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
718 def payload_to_info(payload):
720 Convert packet payload to _PacketInfo object
722 :param payload: packet payload
724 :returns: _PacketInfo object containing de-serialized data from payload
727 numbers = payload.split()
729 info.index = int(numbers[0])
730 info.src = int(numbers[1])
731 info.dst = int(numbers[2])
732 info.ip = int(numbers[3])
733 info.proto = int(numbers[4])
736 def get_next_packet_info(self, info):
738 Iterate over the packet info list stored in the testcase
739 Start iteration with first element if info is None
740 Continue based on index in info if info is specified
742 :param info: info or None
743 :returns: next info in list or None if no more infos
748 next_index = info.index + 1
749 if next_index == len(self._packet_infos):
752 return self._packet_infos[next_index]
754 def get_next_packet_info_for_interface(self, src_index, info):
756 Search the packet info list for the next packet info with same source
759 :param src_index: source interface index to search for
760 :param info: packet info - where to start the search
761 :returns: packet info or None
765 info = self.get_next_packet_info(info)
768 if info.src == src_index:
771 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
773 Search the packet info list for the next packet info with same source
774 and destination interface indexes
776 :param src_index: source interface index to search for
777 :param dst_index: destination interface index to search for
778 :param info: packet info - where to start the search
779 :returns: packet info or None
783 info = self.get_next_packet_info_for_interface(src_index, info)
786 if info.dst == dst_index:
789 def assert_equal(self, real_value, expected_value, name_or_class=None):
790 if name_or_class is None:
791 self.assertEqual(real_value, expected_value)
794 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
795 msg = msg % (getdoc(name_or_class).strip(),
796 real_value, str(name_or_class(real_value)),
797 expected_value, str(name_or_class(expected_value)))
799 msg = "Invalid %s: %s does not match expected value %s" % (
800 name_or_class, real_value, expected_value)
802 self.assertEqual(real_value, expected_value, msg)
804 def assert_in_range(self,
812 msg = "Invalid %s: %s out of range <%s,%s>" % (
813 name, real_value, expected_min, expected_max)
814 self.assertTrue(expected_min <= real_value <= expected_max, msg)
816 def assert_packet_checksums_valid(self, packet,
817 ignore_zero_udp_checksums=True):
818 received = packet.__class__(str(packet))
820 ppp("Verifying packet checksums for packet:", received))
821 udp_layers = ['UDP', 'UDPerror']
822 checksum_fields = ['cksum', 'chksum']
825 temp = received.__class__(str(received))
827 layer = temp.getlayer(counter)
829 for cf in checksum_fields:
830 if hasattr(layer, cf):
831 if ignore_zero_udp_checksums and \
832 0 == getattr(layer, cf) and \
833 layer.name in udp_layers:
836 checksums.append((counter, cf))
839 counter = counter + 1
840 if 0 == len(checksums):
842 temp = temp.__class__(str(temp))
843 for layer, cf in checksums:
844 calc_sum = getattr(temp[layer], cf)
846 getattr(received[layer], cf), calc_sum,
847 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
849 "Checksum field `%s` on `%s` layer has correct value `%s`" %
850 (cf, temp[layer].name, calc_sum))
852 def assert_checksum_valid(self, received_packet, layer,
854 ignore_zero_checksum=False):
855 """ Check checksum of received packet on given layer """
856 received_packet_checksum = getattr(received_packet[layer], field_name)
857 if ignore_zero_checksum and 0 == received_packet_checksum:
859 recalculated = received_packet.__class__(str(received_packet))
860 delattr(recalculated[layer], field_name)
861 recalculated = recalculated.__class__(str(recalculated))
862 self.assert_equal(received_packet_checksum,
863 getattr(recalculated[layer], field_name),
864 "packet checksum on layer: %s" % layer)
866 def assert_ip_checksum_valid(self, received_packet,
867 ignore_zero_checksum=False):
868 self.assert_checksum_valid(received_packet, 'IP',
869 ignore_zero_checksum=ignore_zero_checksum)
871 def assert_tcp_checksum_valid(self, received_packet,
872 ignore_zero_checksum=False):
873 self.assert_checksum_valid(received_packet, 'TCP',
874 ignore_zero_checksum=ignore_zero_checksum)
876 def assert_udp_checksum_valid(self, received_packet,
877 ignore_zero_checksum=True):
878 self.assert_checksum_valid(received_packet, 'UDP',
879 ignore_zero_checksum=ignore_zero_checksum)
881 def assert_embedded_icmp_checksum_valid(self, received_packet):
882 if received_packet.haslayer(IPerror):
883 self.assert_checksum_valid(received_packet, 'IPerror')
884 if received_packet.haslayer(TCPerror):
885 self.assert_checksum_valid(received_packet, 'TCPerror')
886 if received_packet.haslayer(UDPerror):
887 self.assert_checksum_valid(received_packet, 'UDPerror',
888 ignore_zero_checksum=True)
889 if received_packet.haslayer(ICMPerror):
890 self.assert_checksum_valid(received_packet, 'ICMPerror')
892 def assert_icmp_checksum_valid(self, received_packet):
893 self.assert_checksum_valid(received_packet, 'ICMP')
894 self.assert_embedded_icmp_checksum_valid(received_packet)
896 def assert_icmpv6_checksum_valid(self, pkt):
897 if pkt.haslayer(ICMPv6DestUnreach):
898 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
899 self.assert_embedded_icmp_checksum_valid(pkt)
900 if pkt.haslayer(ICMPv6EchoRequest):
901 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
902 if pkt.haslayer(ICMPv6EchoReply):
903 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
905 def assert_packet_counter_equal(self, counter, expected_value):
906 counters = self.vapi.cli("sh errors").split('\n')
908 for i in range(1, len(counters)-1):
909 results = counters[i].split()
910 if results[1] == counter:
911 counter_value = int(results[0])
913 self.assert_equal(counter_value, expected_value,
914 "packet counter `%s'" % counter)
917 def sleep(cls, timeout, remark=None):
918 if hasattr(cls, 'logger'):
919 cls.logger.debug("Starting sleep for %ss (%s)" % (timeout, remark))
923 if after - before > 2 * timeout:
924 cls.logger.error("unexpected time.sleep() result - "
925 "slept for %ss instead of ~%ss!" % (
926 after - before, timeout))
927 if hasattr(cls, 'logger'):
929 "Finished sleep (%s) - slept %ss (wanted %ss)" % (
930 remark, after - before, timeout))
932 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
933 self.vapi.cli("clear trace")
934 intf.add_stream(pkts)
935 self.pg_enable_capture(self.pg_interfaces)
939 for i in self.pg_interfaces:
940 i.get_capture(0, timeout=timeout)
941 i.assert_nothing_captured(remark=remark)
944 def send_and_expect(self, input, pkts, output):
945 self.vapi.cli("clear trace")
946 input.add_stream(pkts)
947 self.pg_enable_capture(self.pg_interfaces)
949 if isinstance(object, (list,)):
952 rx.append(output.get_capture(len(pkts)))
954 rx = output.get_capture(len(pkts))
957 def send_and_expect_only(self, input, pkts, output, timeout=None):
958 self.vapi.cli("clear trace")
959 input.add_stream(pkts)
960 self.pg_enable_capture(self.pg_interfaces)
962 if isinstance(object, (list,)):
966 rx.append(output.get_capture(len(pkts)))
968 rx = output.get_capture(len(pkts))
972 for i in self.pg_interfaces:
974 i.get_capture(0, timeout=timeout)
975 i.assert_nothing_captured()
981 def get_testcase_doc_name(test):
982 return getdoc(test.__class__).splitlines()[0]
985 def get_test_description(descriptions, test):
986 short_description = test.shortDescription()
987 if descriptions and short_description:
988 return short_description
993 class TestCaseInfo(object):
994 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
996 self.tempdir = tempdir
997 self.vpp_pid = vpp_pid
998 self.vpp_bin_path = vpp_bin_path
999 self.core_crash_test = None
1002 class VppTestResult(unittest.TestResult):
1004 @property result_string
1005 String variable to store the test case result string.
1007 List variable containing 2-tuples of TestCase instances and strings
1008 holding formatted tracebacks. Each tuple represents a test which
1009 raised an unexpected exception.
1011 List variable containing 2-tuples of TestCase instances and strings
1012 holding formatted tracebacks. Each tuple represents a test where
1013 a failure was explicitly signalled using the TestCase.assert*()
1017 failed_test_cases_info = set()
1018 core_crash_test_cases_info = set()
1019 current_test_case_info = None
1021 def __init__(self, stream, descriptions, verbosity):
1023 :param stream File descriptor to store where to report test results.
1024 Set to the standard error stream by default.
1025 :param descriptions Boolean variable to store information if to use
1026 test case descriptions.
1027 :param verbosity Integer variable to store required verbosity level.
1029 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
1030 self.stream = stream
1031 self.descriptions = descriptions
1032 self.verbosity = verbosity
1033 self.result_string = None
1035 def addSuccess(self, test):
1037 Record a test succeeded result
1042 if self.current_test_case_info:
1043 self.current_test_case_info.logger.debug(
1044 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1045 test._testMethodName,
1046 test._testMethodDoc))
1047 unittest.TestResult.addSuccess(self, test)
1048 self.result_string = colorize("OK", GREEN)
1050 self.send_result_through_pipe(test, PASS)
1052 def addSkip(self, test, reason):
1054 Record a test skipped.
1060 if self.current_test_case_info:
1061 self.current_test_case_info.logger.debug(
1062 "--- addSkip() %s.%s(%s) called, reason is %s" %
1063 (test.__class__.__name__, test._testMethodName,
1064 test._testMethodDoc, reason))
1065 unittest.TestResult.addSkip(self, test, reason)
1066 self.result_string = colorize("SKIP", YELLOW)
1068 self.send_result_through_pipe(test, SKIP)
1070 def symlink_failed(self):
1071 if self.current_test_case_info:
1073 failed_dir = os.getenv('VPP_TEST_FAILED_DIR')
1074 link_path = os.path.join(
1077 os.path.basename(self.current_test_case_info.tempdir))
1078 if self.current_test_case_info.logger:
1079 self.current_test_case_info.logger.debug(
1080 "creating a link to the failed test")
1081 self.current_test_case_info.logger.debug(
1082 "os.symlink(%s, %s)" %
1083 (self.current_test_case_info.tempdir, link_path))
1084 if os.path.exists(link_path):
1085 if self.current_test_case_info.logger:
1086 self.current_test_case_info.logger.debug(
1087 'symlink already exists')
1089 os.symlink(self.current_test_case_info.tempdir, link_path)
1091 except Exception as e:
1092 if self.current_test_case_info.logger:
1093 self.current_test_case_info.logger.error(e)
1095 def send_result_through_pipe(self, test, result):
1096 if hasattr(self, 'test_framework_result_pipe'):
1097 pipe = self.test_framework_result_pipe
1099 pipe.send((test.id(), result))
1101 def log_error(self, test, err, fn_name):
1102 if self.current_test_case_info:
1103 if isinstance(test, unittest.suite._ErrorHolder):
1104 test_name = test.description
1106 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1107 test._testMethodName,
1108 test._testMethodDoc)
1109 self.current_test_case_info.logger.debug(
1110 "--- %s() %s called, err is %s" %
1111 (fn_name, test_name, err))
1112 self.current_test_case_info.logger.debug(
1113 "formatted exception is:\n%s" %
1114 "".join(format_exception(*err)))
1116 def add_error(self, test, err, unittest_fn, error_type):
1117 if error_type == FAIL:
1118 self.log_error(test, err, 'addFailure')
1119 error_type_str = colorize("FAIL", RED)
1120 elif error_type == ERROR:
1121 self.log_error(test, err, 'addError')
1122 error_type_str = colorize("ERROR", RED)
1124 raise Exception('Error type %s cannot be used to record an '
1125 'error or a failure' % error_type)
1127 unittest_fn(self, test, err)
1128 if self.current_test_case_info:
1129 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1131 self.current_test_case_info.tempdir)
1132 self.symlink_failed()
1133 self.failed_test_cases_info.add(self.current_test_case_info)
1134 if is_core_present(self.current_test_case_info.tempdir):
1135 if not self.current_test_case_info.core_crash_test:
1136 if isinstance(test, unittest.suite._ErrorHolder):
1137 test_name = str(test)
1139 test_name = "'{}' ({})".format(
1140 get_testcase_doc_name(test), test.id())
1141 self.current_test_case_info.core_crash_test = test_name
1142 self.core_crash_test_cases_info.add(
1143 self.current_test_case_info)
1145 self.result_string = '%s [no temp dir]' % error_type_str
1147 self.send_result_through_pipe(test, error_type)
1149 def addFailure(self, test, err):
1151 Record a test failed result
1154 :param err: error message
1157 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1159 def addError(self, test, err):
1161 Record a test error result
1164 :param err: error message
1167 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1169 def getDescription(self, test):
1171 Get test description
1174 :returns: test description
1177 return get_test_description(self.descriptions, test)
1179 def startTest(self, test):
1186 test.print_header(test.__class__)
1188 unittest.TestResult.startTest(self, test)
1189 if self.verbosity > 0:
1190 self.stream.writeln(
1191 "Starting " + self.getDescription(test) + " ...")
1192 self.stream.writeln(single_line_delim)
1194 def stopTest(self, test):
1196 Called when the given test has been run
1201 unittest.TestResult.stopTest(self, test)
1202 if self.verbosity > 0:
1203 self.stream.writeln(single_line_delim)
1204 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1205 self.result_string))
1206 self.stream.writeln(single_line_delim)
1208 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1209 self.result_string))
1211 self.send_result_through_pipe(test, TEST_RUN)
1213 def printErrors(self):
1215 Print errors from running the test case
1217 self.stream.writeln()
1218 self.printErrorList('ERROR', self.errors)
1219 self.printErrorList('FAIL', self.failures)
1221 def printErrorList(self, flavour, errors):
1223 Print error list to the output stream together with error type
1224 and test case description.
1226 :param flavour: error type
1227 :param errors: iterable errors
1230 for test, err in errors:
1231 self.stream.writeln(double_line_delim)
1232 self.stream.writeln("%s: %s" %
1233 (flavour, self.getDescription(test)))
1234 self.stream.writeln(single_line_delim)
1235 self.stream.writeln("%s" % err)
1238 class VppTestRunner(unittest.TextTestRunner):
1240 A basic test runner implementation which prints results to standard error.
1243 def resultclass(self):
1244 """Class maintaining the results of the tests"""
1245 return VppTestResult
1247 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1248 result_pipe=None, failfast=False, buffer=False,
1251 # ignore stream setting here, use hard-coded stdout to be in sync
1252 # with prints from VppTestCase methods ...
1253 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1254 verbosity, failfast, buffer,
1256 reporter = KeepAliveReporter()
1257 reporter.pipe = keep_alive_pipe
1259 VppTestResult.test_framework_result_pipe = result_pipe
1261 def run(self, test):
1268 faulthandler.enable() # emit stack trace to stderr if killed by signal
1270 result = super(VppTestRunner, self).run(test)
1274 class Worker(Thread):
1275 def __init__(self, args, logger, env={}):
1276 self.logger = logger
1279 self.env = copy.deepcopy(env)
1280 super(Worker, self).__init__()
1283 executable = self.args[0]
1284 self.logger.debug("Running executable w/args `%s'" % self.args)
1285 env = os.environ.copy()
1286 env.update(self.env)
1287 env["CK_LOG_FILE_NAME"] = "-"
1288 self.process = subprocess.Popen(
1289 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1290 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1291 out, err = self.process.communicate()
1292 self.logger.debug("Finished running `%s'" % executable)
1293 self.logger.info("Return code is `%s'" % self.process.returncode)
1294 self.logger.info(single_line_delim)
1295 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1296 self.logger.info(single_line_delim)
1297 self.logger.info(out)
1298 self.logger.info(single_line_delim)
1299 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1300 self.logger.info(single_line_delim)
1301 self.logger.info(err)
1302 self.logger.info(single_line_delim)
1303 self.result = self.process.returncode