3 from __future__ import print_function
14 from collections import deque
15 from threading import Thread, Event
16 from inspect import getdoc, isclass
17 from traceback import format_exception
18 from logging import FileHandler, DEBUG, Formatter
19 from scapy.packet import Raw
20 from hook import StepHook, PollHook, VppDiedError
21 from vpp_pg_interface import VppPGInterface
22 from vpp_sub_interface import VppSubInterface
23 from vpp_lo_interface import VppLoInterface
24 from vpp_papi_provider import VppPapiProvider
25 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
27 from vpp_object import VppObjectRegistry
28 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
29 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
30 from scapy.layers.inet6 import ICMPv6EchoReply
31 if os.name == 'posix' and sys.version_info[0] < 3:
32 # using subprocess32 is recommended by python official documentation
33 # @ https://docs.python.org/2/library/subprocess.html
34 import subprocess32 as subprocess
39 debug_framework = False
40 if os.getenv('TEST_DEBUG', "0") == "1":
41 debug_framework = True
46 Test framework module.
48 The module provides a set of tools for constructing and running tests and
49 representing the results.
53 class _PacketInfo(object):
54 """Private class to create packet info object.
56 Help process information about the next packet.
57 Set variables to default values.
59 #: Store the index of the packet.
61 #: Store the index of the source packet generator interface of the packet.
63 #: Store the index of the destination packet generator interface
66 #: Store expected ip version
68 #: Store expected upper protocol
70 #: Store the copy of the former packet.
73 def __eq__(self, other):
74 index = self.index == other.index
75 src = self.src == other.src
76 dst = self.dst == other.dst
77 data = self.data == other.data
78 return index and src and dst and data
81 def pump_output(testclass):
82 """ pump output from vpp stdout/stderr to proper queues """
85 while not testclass.pump_thread_stop_flag.wait(0):
86 readable = select.select([testclass.vpp.stdout.fileno(),
87 testclass.vpp.stderr.fileno(),
88 testclass.pump_thread_wakeup_pipe[0]],
90 if testclass.vpp.stdout.fileno() in readable:
91 read = os.read(testclass.vpp.stdout.fileno(), 102400)
93 split = read.splitlines(True)
94 if len(stdout_fragment) > 0:
95 split[0] = "%s%s" % (stdout_fragment, split[0])
96 if len(split) > 0 and split[-1].endswith("\n"):
100 stdout_fragment = split[-1]
101 testclass.vpp_stdout_deque.extend(split[:limit])
102 if not testclass.cache_vpp_output:
103 for line in split[:limit]:
104 testclass.logger.debug(
105 "VPP STDOUT: %s" % line.rstrip("\n"))
106 if testclass.vpp.stderr.fileno() in readable:
107 read = os.read(testclass.vpp.stderr.fileno(), 102400)
109 split = read.splitlines(True)
110 if len(stderr_fragment) > 0:
111 split[0] = "%s%s" % (stderr_fragment, split[0])
112 if len(split) > 0 and split[-1].endswith("\n"):
116 stderr_fragment = split[-1]
117 testclass.vpp_stderr_deque.extend(split[:limit])
118 if not testclass.cache_vpp_output:
119 for line in split[:limit]:
120 testclass.logger.debug(
121 "VPP STDERR: %s" % line.rstrip("\n"))
122 # ignoring the dummy pipe here intentionally - the flag will take care
123 # of properly terminating the loop
126 def running_extended_tests():
127 s = os.getenv("EXTENDED_TESTS", "n")
128 return True if s.lower() in ("y", "yes", "1") else False
131 def running_on_centos():
132 os_id = os.getenv("OS_ID", "")
133 return True if "centos" in os_id.lower() else False
136 class KeepAliveReporter(object):
138 Singleton object which reports test start to parent process
143 self.__dict__ = self._shared_state
150 def pipe(self, pipe):
151 if hasattr(self, '_pipe'):
152 raise Exception("Internal error - pipe should only be set once.")
155 def send_keep_alive(self, test):
157 Write current test tmpdir & desc to keep-alive pipe to signal liveness
159 if self.pipe is None:
160 # if not running forked..
166 desc = test.shortDescription()
170 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
173 class VppTestCase(unittest.TestCase):
174 """This subclass is a base class for VPP test cases that are implemented as
175 classes. It provides methods to create and run test case.
179 def packet_infos(self):
180 """List of packet infos"""
181 return self._packet_infos
184 def get_packet_count_for_if_idx(cls, dst_if_index):
185 """Get the number of packet info for specified destination if index"""
186 if dst_if_index in cls._packet_count_for_dst_if_idx:
187 return cls._packet_count_for_dst_if_idx[dst_if_index]
193 """Return the instance of this testcase"""
194 return cls.test_instance
197 def set_debug_flags(cls, d):
198 cls.debug_core = False
199 cls.debug_gdb = False
200 cls.debug_gdbserver = False
205 cls.debug_core = True
208 elif dl == "gdbserver":
209 cls.debug_gdbserver = True
211 raise Exception("Unrecognized DEBUG option: '%s'" % d)
214 def setUpConstants(cls):
215 """ Set-up the test case class based on environment variables """
216 s = os.getenv("STEP", "n")
217 cls.step = True if s.lower() in ("y", "yes", "1") else False
218 d = os.getenv("DEBUG", None)
219 c = os.getenv("CACHE_OUTPUT", "1")
220 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
221 cls.set_debug_flags(d)
222 cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
223 cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
224 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
226 if cls.plugin_path is not None:
227 if cls.extern_plugin_path is not None:
228 plugin_path = "%s:%s" % (
229 cls.plugin_path, cls.extern_plugin_path)
231 plugin_path = cls.plugin_path
232 elif cls.extern_plugin_path is not None:
233 plugin_path = cls.extern_plugin_path
235 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
236 debug_cli = "cli-listen localhost:5002"
238 size = os.getenv("COREDUMP_SIZE")
240 coredump_size = "coredump-size %s" % size
241 if coredump_size is None:
242 coredump_size = "coredump-size unlimited"
243 cls.vpp_cmdline = [cls.vpp_bin, "unix",
244 "{", "nodaemon", debug_cli, "full-coredump",
245 coredump_size, "}", "api-trace", "{", "on", "}",
246 "api-segment", "{", "prefix", cls.shm_prefix, "}",
247 "plugins", "{", "plugin", "dpdk_plugin.so", "{",
248 "disable", "}", "}", ]
249 if plugin_path is not None:
250 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
251 cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
254 def wait_for_enter(cls):
255 if cls.debug_gdbserver:
256 print(double_line_delim)
257 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
259 print(double_line_delim)
260 print("Spawned VPP with PID: %d" % cls.vpp.pid)
262 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
264 print(single_line_delim)
265 print("You can debug the VPP using e.g.:")
266 if cls.debug_gdbserver:
267 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
268 print("Now is the time to attach a gdb by running the above "
269 "command, set up breakpoints etc. and then resume VPP from "
270 "within gdb by issuing the 'continue' command")
272 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
273 print("Now is the time to attach a gdb by running the above "
274 "command and set up breakpoints etc.")
275 print(single_line_delim)
276 raw_input("Press ENTER to continue running the testcase...")
280 cmdline = cls.vpp_cmdline
282 if cls.debug_gdbserver:
283 gdbserver = '/usr/bin/gdbserver'
284 if not os.path.isfile(gdbserver) or \
285 not os.access(gdbserver, os.X_OK):
286 raise Exception("gdbserver binary '%s' does not exist or is "
287 "not executable" % gdbserver)
289 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
290 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
293 cls.vpp = subprocess.Popen(cmdline,
294 stdout=subprocess.PIPE,
295 stderr=subprocess.PIPE,
297 except Exception as e:
298 cls.logger.critical("Couldn't start vpp: %s" % e)
306 Perform class setup before running the testcase
307 Remove shared memory files, start vpp and connect the vpp-api
309 gc.collect() # run garbage collection first
311 cls.logger = getLogger(cls.__name__)
312 cls.tempdir = tempfile.mkdtemp(
313 prefix='vpp-unittest-%s-' % cls.__name__)
314 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
315 cls.file_handler.setFormatter(
316 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
318 cls.file_handler.setLevel(DEBUG)
319 cls.logger.addHandler(cls.file_handler)
320 cls.shm_prefix = cls.tempdir.split("/")[-1]
321 os.chdir(cls.tempdir)
322 cls.logger.info("Temporary dir is %s, shm prefix is %s",
323 cls.tempdir, cls.shm_prefix)
325 cls.reset_packet_infos()
327 cls._zombie_captures = []
330 cls.registry = VppObjectRegistry()
331 cls.vpp_startup_failed = False
332 cls.reporter = KeepAliveReporter()
333 # need to catch exceptions here because if we raise, then the cleanup
334 # doesn't get called and we might end with a zombie vpp
337 cls.reporter.send_keep_alive(cls)
338 cls.vpp_stdout_deque = deque()
339 cls.vpp_stderr_deque = deque()
340 cls.pump_thread_stop_flag = Event()
341 cls.pump_thread_wakeup_pipe = os.pipe()
342 cls.pump_thread = Thread(target=pump_output, args=(cls,))
343 cls.pump_thread.daemon = True
344 cls.pump_thread.start()
345 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls)
350 cls.vapi.register_hook(hook)
351 cls.sleep(0.1, "after vpp startup, before initial poll")
355 cls.vpp_startup_failed = True
357 "VPP died shortly after startup, check the"
358 " output to standard error for possible cause")
364 cls.vapi.disconnect()
367 if cls.debug_gdbserver:
368 print(colorize("You're running VPP inside gdbserver but "
369 "VPP-API connection failed, did you forget "
370 "to 'continue' VPP from within gdb?", RED))
382 Disconnect vpp-api, kill vpp and cleanup shared memory files
384 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
386 if cls.vpp.returncode is None:
387 print(double_line_delim)
388 print("VPP or GDB server is still running")
389 print(single_line_delim)
390 raw_input("When done debugging, press ENTER to kill the "
391 "process and finish running the testcase...")
393 os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
394 cls.pump_thread_stop_flag.set()
395 if hasattr(cls, 'pump_thread'):
396 cls.logger.debug("Waiting for pump thread to stop")
397 cls.pump_thread.join()
398 if hasattr(cls, 'vpp_stderr_reader_thread'):
399 cls.logger.debug("Waiting for stdderr pump to stop")
400 cls.vpp_stderr_reader_thread.join()
402 if hasattr(cls, 'vpp'):
403 if hasattr(cls, 'vapi'):
404 cls.vapi.disconnect()
407 if cls.vpp.returncode is None:
408 cls.logger.debug("Sending TERM to vpp")
410 cls.logger.debug("Waiting for vpp to die")
411 cls.vpp.communicate()
414 if cls.vpp_startup_failed:
415 stdout_log = cls.logger.info
416 stderr_log = cls.logger.critical
418 stdout_log = cls.logger.info
419 stderr_log = cls.logger.info
421 if hasattr(cls, 'vpp_stdout_deque'):
422 stdout_log(single_line_delim)
423 stdout_log('VPP output to stdout while running %s:', cls.__name__)
424 stdout_log(single_line_delim)
425 vpp_output = "".join(cls.vpp_stdout_deque)
426 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
428 stdout_log('\n%s', vpp_output)
429 stdout_log(single_line_delim)
431 if hasattr(cls, 'vpp_stderr_deque'):
432 stderr_log(single_line_delim)
433 stderr_log('VPP output to stderr while running %s:', cls.__name__)
434 stderr_log(single_line_delim)
435 vpp_output = "".join(cls.vpp_stderr_deque)
436 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
438 stderr_log('\n%s', vpp_output)
439 stderr_log(single_line_delim)
442 def tearDownClass(cls):
443 """ Perform final cleanup after running all tests in this test-case """
445 cls.file_handler.close()
446 cls.reset_packet_infos()
448 debug_internal.on_tear_down_class(cls)
451 """ Show various debug prints after each test """
452 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
453 (self.__class__.__name__, self._testMethodName,
454 self._testMethodDoc))
455 if not self.vpp_dead:
456 self.logger.debug(self.vapi.cli("show trace"))
457 self.logger.info(self.vapi.ppcli("show interface"))
458 self.logger.info(self.vapi.ppcli("show hardware"))
459 self.logger.info(self.vapi.ppcli("show error"))
460 self.logger.info(self.vapi.ppcli("show run"))
461 self.logger.info(self.vapi.ppcli("show log"))
462 self.registry.remove_vpp_config(self.logger)
463 # Save/Dump VPP api trace log
464 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
465 tmp_api_trace = "/tmp/%s" % api_trace
466 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
467 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
468 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
470 os.rename(tmp_api_trace, vpp_api_trace_log)
471 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
474 self.registry.unregister_all(self.logger)
477 """ Clear trace before running each test"""
478 self.reporter.send_keep_alive(self)
479 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
480 (self.__class__.__name__, self._testMethodName,
481 self._testMethodDoc))
483 raise Exception("VPP is dead when setting up the test")
484 self.sleep(.1, "during setUp")
485 self.vpp_stdout_deque.append(
486 "--- test setUp() for %s.%s(%s) starts here ---\n" %
487 (self.__class__.__name__, self._testMethodName,
488 self._testMethodDoc))
489 self.vpp_stderr_deque.append(
490 "--- test setUp() for %s.%s(%s) starts here ---\n" %
491 (self.__class__.__name__, self._testMethodName,
492 self._testMethodDoc))
493 self.vapi.cli("clear trace")
494 # store the test instance inside the test class - so that objects
495 # holding the class can access instance methods (like assertEqual)
496 type(self).test_instance = self
499 def pg_enable_capture(cls, interfaces=None):
501 Enable capture on packet-generator interfaces
503 :param interfaces: iterable interface indexes (if None,
504 use self.pg_interfaces)
507 if interfaces is None:
508 interfaces = cls.pg_interfaces
513 def register_capture(cls, cap_name):
514 """ Register a capture in the testclass """
515 # add to the list of captures with current timestamp
516 cls._captures.append((time.time(), cap_name))
517 # filter out from zombies
518 cls._zombie_captures = [(stamp, name)
519 for (stamp, name) in cls._zombie_captures
524 """ Remove any zombie captures and enable the packet generator """
525 # how long before capture is allowed to be deleted - otherwise vpp
526 # crashes - 100ms seems enough (this shouldn't be needed at all)
529 for stamp, cap_name in cls._zombie_captures:
530 wait = stamp + capture_ttl - now
532 cls.sleep(wait, "before deleting capture %s" % cap_name)
534 cls.logger.debug("Removing zombie capture %s" % cap_name)
535 cls.vapi.cli('packet-generator delete %s' % cap_name)
537 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
538 cls.vapi.cli('packet-generator enable')
539 cls._zombie_captures = cls._captures
543 def create_pg_interfaces(cls, interfaces):
545 Create packet-generator interfaces.
547 :param interfaces: iterable indexes of the interfaces.
548 :returns: List of created interfaces.
553 intf = VppPGInterface(cls, i)
554 setattr(cls, intf.name, intf)
556 cls.pg_interfaces = result
560 def create_loopback_interfaces(cls, interfaces):
562 Create loopback interfaces.
564 :param interfaces: iterable indexes of the interfaces.
565 :returns: List of created interfaces.
569 intf = VppLoInterface(cls, i)
570 setattr(cls, intf.name, intf)
572 cls.lo_interfaces = result
576 def extend_packet(packet, size, padding=' '):
578 Extend packet to given size by padding with spaces or custom padding
579 NOTE: Currently works only when Raw layer is present.
581 :param packet: packet
582 :param size: target size
583 :param padding: padding used to extend the payload
586 packet_len = len(packet) + 4
587 extend = size - packet_len
589 num = (extend / len(padding)) + 1
590 packet[Raw].load += (padding * num)[:extend]
593 def reset_packet_infos(cls):
594 """ Reset the list of packet info objects and packet counts to zero """
595 cls._packet_infos = {}
596 cls._packet_count_for_dst_if_idx = {}
599 def create_packet_info(cls, src_if, dst_if):
601 Create packet info object containing the source and destination indexes
602 and add it to the testcase's packet info list
604 :param VppInterface src_if: source interface
605 :param VppInterface dst_if: destination interface
607 :returns: _PacketInfo object
611 info.index = len(cls._packet_infos)
612 info.src = src_if.sw_if_index
613 info.dst = dst_if.sw_if_index
614 if isinstance(dst_if, VppSubInterface):
615 dst_idx = dst_if.parent.sw_if_index
617 dst_idx = dst_if.sw_if_index
618 if dst_idx in cls._packet_count_for_dst_if_idx:
619 cls._packet_count_for_dst_if_idx[dst_idx] += 1
621 cls._packet_count_for_dst_if_idx[dst_idx] = 1
622 cls._packet_infos[info.index] = info
626 def info_to_payload(info):
628 Convert _PacketInfo object to packet payload
630 :param info: _PacketInfo object
632 :returns: string containing serialized data from packet info
634 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
638 def payload_to_info(payload):
640 Convert packet payload to _PacketInfo object
642 :param payload: packet payload
644 :returns: _PacketInfo object containing de-serialized data from payload
647 numbers = payload.split()
649 info.index = int(numbers[0])
650 info.src = int(numbers[1])
651 info.dst = int(numbers[2])
652 info.ip = int(numbers[3])
653 info.proto = int(numbers[4])
656 def get_next_packet_info(self, info):
658 Iterate over the packet info list stored in the testcase
659 Start iteration with first element if info is None
660 Continue based on index in info if info is specified
662 :param info: info or None
663 :returns: next info in list or None if no more infos
668 next_index = info.index + 1
669 if next_index == len(self._packet_infos):
672 return self._packet_infos[next_index]
674 def get_next_packet_info_for_interface(self, src_index, info):
676 Search the packet info list for the next packet info with same source
679 :param src_index: source interface index to search for
680 :param info: packet info - where to start the search
681 :returns: packet info or None
685 info = self.get_next_packet_info(info)
688 if info.src == src_index:
691 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
693 Search the packet info list for the next packet info with same source
694 and destination interface indexes
696 :param src_index: source interface index to search for
697 :param dst_index: destination interface index to search for
698 :param info: packet info - where to start the search
699 :returns: packet info or None
703 info = self.get_next_packet_info_for_interface(src_index, info)
706 if info.dst == dst_index:
709 def assert_equal(self, real_value, expected_value, name_or_class=None):
710 if name_or_class is None:
711 self.assertEqual(real_value, expected_value)
714 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
715 msg = msg % (getdoc(name_or_class).strip(),
716 real_value, str(name_or_class(real_value)),
717 expected_value, str(name_or_class(expected_value)))
719 msg = "Invalid %s: %s does not match expected value %s" % (
720 name_or_class, real_value, expected_value)
722 self.assertEqual(real_value, expected_value, msg)
724 def assert_in_range(self,
732 msg = "Invalid %s: %s out of range <%s,%s>" % (
733 name, real_value, expected_min, expected_max)
734 self.assertTrue(expected_min <= real_value <= expected_max, msg)
736 def assert_packet_checksums_valid(self, packet,
737 ignore_zero_udp_checksums=True):
738 udp_layers = ['UDP', 'UDPerror']
739 checksum_fields = ['cksum', 'chksum']
742 temp = packet.__class__(str(packet))
744 layer = temp.getlayer(counter)
746 for cf in checksum_fields:
747 if hasattr(layer, cf):
748 if ignore_zero_udp_checksums and \
749 0 == getattr(layer, cf) and \
750 layer.name in udp_layers:
753 checksums.append((counter, cf))
756 counter = counter + 1
757 temp = temp.__class__(str(temp))
758 for layer, cf in checksums:
759 self.assert_equal(getattr(packet[layer], cf),
760 getattr(temp[layer], cf),
761 "packet checksum on layer #%d: %s" % (
762 layer, temp[layer].name))
764 def assert_checksum_valid(self, received_packet, layer,
766 ignore_zero_checksum=False):
767 """ Check checksum of received packet on given layer """
768 received_packet_checksum = getattr(received_packet[layer], field_name)
769 if ignore_zero_checksum and 0 == received_packet_checksum:
771 recalculated = received_packet.__class__(str(received_packet))
772 delattr(recalculated[layer], field_name)
773 recalculated = recalculated.__class__(str(recalculated))
774 self.assert_equal(received_packet_checksum,
775 getattr(recalculated[layer], field_name),
776 "packet checksum on layer: %s" % layer)
778 def assert_ip_checksum_valid(self, received_packet,
779 ignore_zero_checksum=False):
780 self.assert_checksum_valid(received_packet, 'IP',
781 ignore_zero_checksum=ignore_zero_checksum)
783 def assert_tcp_checksum_valid(self, received_packet,
784 ignore_zero_checksum=False):
785 self.assert_checksum_valid(received_packet, 'TCP',
786 ignore_zero_checksum=ignore_zero_checksum)
788 def assert_udp_checksum_valid(self, received_packet,
789 ignore_zero_checksum=True):
790 self.assert_checksum_valid(received_packet, 'UDP',
791 ignore_zero_checksum=ignore_zero_checksum)
793 def assert_embedded_icmp_checksum_valid(self, received_packet):
794 if received_packet.haslayer(IPerror):
795 self.assert_checksum_valid(received_packet, 'IPerror')
796 if received_packet.haslayer(TCPerror):
797 self.assert_checksum_valid(received_packet, 'TCPerror')
798 if received_packet.haslayer(UDPerror):
799 self.assert_checksum_valid(received_packet, 'UDPerror',
800 ignore_zero_checksum=True)
801 if received_packet.haslayer(ICMPerror):
802 self.assert_checksum_valid(received_packet, 'ICMPerror')
804 def assert_icmp_checksum_valid(self, received_packet):
805 self.assert_checksum_valid(received_packet, 'ICMP')
806 self.assert_embedded_icmp_checksum_valid(received_packet)
808 def assert_icmpv6_checksum_valid(self, pkt):
809 if pkt.haslayer(ICMPv6DestUnreach):
810 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
811 self.assert_embedded_icmp_checksum_valid(pkt)
812 if pkt.haslayer(ICMPv6EchoRequest):
813 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
814 if pkt.haslayer(ICMPv6EchoReply):
815 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
818 def sleep(cls, timeout, remark=None):
819 if hasattr(cls, 'logger'):
820 cls.logger.debug("Starting sleep for %ss (%s)" % (timeout, remark))
824 if after - before > 2 * timeout:
825 cls.logger.error("unexpected time.sleep() result - "
826 "slept for %ss instead of ~%ss!" % (
827 after - before, timeout))
828 if hasattr(cls, 'logger'):
830 "Finished sleep (%s) - slept %ss (wanted %ss)" % (
831 remark, after - before, timeout))
833 def send_and_assert_no_replies(self, intf, pkts, remark=""):
834 self.vapi.cli("clear trace")
835 intf.add_stream(pkts)
836 self.pg_enable_capture(self.pg_interfaces)
839 for i in self.pg_interfaces:
840 i.get_capture(0, timeout=timeout)
841 i.assert_nothing_captured(remark=remark)
844 def send_and_expect(self, input, pkts, output):
845 self.vapi.cli("clear trace")
846 input.add_stream(pkts)
847 self.pg_enable_capture(self.pg_interfaces)
849 rx = output.get_capture(len(pkts))
853 class TestCasePrinter(object):
857 self.__dict__ = self._shared_state
858 if not hasattr(self, "_test_case_set"):
859 self._test_case_set = set()
861 def print_test_case_heading_if_first_time(self, case):
862 if case.__class__ not in self._test_case_set:
863 print(double_line_delim)
864 print(colorize(getdoc(case.__class__).splitlines()[0], YELLOW))
865 print(double_line_delim)
866 self._test_case_set.add(case.__class__)
869 class VppTestResult(unittest.TestResult):
871 @property result_string
872 String variable to store the test case result string.
874 List variable containing 2-tuples of TestCase instances and strings
875 holding formatted tracebacks. Each tuple represents a test which
876 raised an unexpected exception.
878 List variable containing 2-tuples of TestCase instances and strings
879 holding formatted tracebacks. Each tuple represents a test where
880 a failure was explicitly signalled using the TestCase.assert*()
884 def __init__(self, stream, descriptions, verbosity):
886 :param stream File descriptor to store where to report test results.
887 Set to the standard error stream by default.
888 :param descriptions Boolean variable to store information if to use
889 test case descriptions.
890 :param verbosity Integer variable to store required verbosity level.
892 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
894 self.descriptions = descriptions
895 self.verbosity = verbosity
896 self.result_string = None
897 self.printer = TestCasePrinter()
899 def addSuccess(self, test):
901 Record a test succeeded result
906 if hasattr(test, 'logger'):
907 test.logger.debug("--- addSuccess() %s.%s(%s) called"
908 % (test.__class__.__name__,
909 test._testMethodName,
910 test._testMethodDoc))
911 unittest.TestResult.addSuccess(self, test)
912 self.result_string = colorize("OK", GREEN)
914 def addSkip(self, test, reason):
916 Record a test skipped.
922 if hasattr(test, 'logger'):
923 test.logger.debug("--- addSkip() %s.%s(%s) called, reason is %s"
924 % (test.__class__.__name__,
925 test._testMethodName,
928 unittest.TestResult.addSkip(self, test, reason)
929 self.result_string = colorize("SKIP", YELLOW)
931 def symlink_failed(self, test):
933 if hasattr(test, 'logger'):
935 if hasattr(test, 'tempdir'):
937 failed_dir = os.getenv('VPP_TEST_FAILED_DIR')
938 link_path = '%s/%s-FAILED' % (failed_dir,
939 test.tempdir.split("/")[-1])
941 logger.debug("creating a link to the failed test")
942 logger.debug("os.symlink(%s, %s)" %
943 (test.tempdir, link_path))
944 os.symlink(test.tempdir, link_path)
945 except Exception as e:
949 def send_failure_through_pipe(self, test):
950 if hasattr(self, 'test_framework_failed_pipe'):
951 pipe = self.test_framework_failed_pipe
953 pipe.send(test.__class__)
955 def addFailure(self, test, err):
957 Record a test failed result
960 :param err: error message
963 if hasattr(test, 'logger'):
964 test.logger.debug("--- addFailure() %s.%s(%s) called, err is %s"
965 % (test.__class__.__name__,
966 test._testMethodName,
967 test._testMethodDoc, err))
968 test.logger.debug("formatted exception is:\n%s" %
969 "".join(format_exception(*err)))
970 unittest.TestResult.addFailure(self, test, err)
971 if hasattr(test, 'tempdir'):
972 self.result_string = colorize("FAIL", RED) + \
973 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
974 self.symlink_failed(test)
976 self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
978 self.send_failure_through_pipe(test)
980 def addError(self, test, err):
982 Record a test error result
985 :param err: error message
988 if hasattr(test, 'logger'):
989 test.logger.debug("--- addError() %s.%s(%s) called, err is %s"
990 % (test.__class__.__name__,
991 test._testMethodName,
992 test._testMethodDoc, err))
993 test.logger.debug("formatted exception is:\n%s" %
994 "".join(format_exception(*err)))
995 unittest.TestResult.addError(self, test, err)
996 if hasattr(test, 'tempdir'):
997 self.result_string = colorize("ERROR", RED) + \
998 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
999 self.symlink_failed(test)
1001 self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
1003 self.send_failure_through_pipe(test)
1005 def getDescription(self, test):
1007 Get test description
1010 :returns: test description
1013 # TODO: if none print warning not raise exception
1014 short_description = test.shortDescription()
1015 if self.descriptions and short_description:
1016 return short_description
1020 def startTest(self, test):
1027 self.printer.print_test_case_heading_if_first_time(test)
1028 unittest.TestResult.startTest(self, test)
1029 if self.verbosity > 0:
1030 self.stream.writeln(
1031 "Starting " + self.getDescription(test) + " ...")
1032 self.stream.writeln(single_line_delim)
1034 def stopTest(self, test):
1041 unittest.TestResult.stopTest(self, test)
1042 if self.verbosity > 0:
1043 self.stream.writeln(single_line_delim)
1044 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1045 self.result_string))
1046 self.stream.writeln(single_line_delim)
1048 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1049 self.result_string))
1051 def printErrors(self):
1053 Print errors from running the test case
1055 self.stream.writeln()
1056 self.printErrorList('ERROR', self.errors)
1057 self.printErrorList('FAIL', self.failures)
1059 def printErrorList(self, flavour, errors):
1061 Print error list to the output stream together with error type
1062 and test case description.
1064 :param flavour: error type
1065 :param errors: iterable errors
1068 for test, err in errors:
1069 self.stream.writeln(double_line_delim)
1070 self.stream.writeln("%s: %s" %
1071 (flavour, self.getDescription(test)))
1072 self.stream.writeln(single_line_delim)
1073 self.stream.writeln("%s" % err)
1076 class Filter_by_test_option:
1077 def __init__(self, filter_file_name, filter_class_name, filter_func_name):
1078 self.filter_file_name = filter_file_name
1079 self.filter_class_name = filter_class_name
1080 self.filter_func_name = filter_func_name
1082 def __call__(self, file_name, class_name, func_name):
1083 if self.filter_file_name and file_name != self.filter_file_name:
1085 if self.filter_class_name and class_name != self.filter_class_name:
1087 if self.filter_func_name and func_name != self.filter_func_name:
1092 class VppTestRunner(unittest.TextTestRunner):
1094 A basic test runner implementation which prints results to standard error.
1097 def resultclass(self):
1098 """Class maintaining the results of the tests"""
1099 return VppTestResult
1101 def __init__(self, keep_alive_pipe=None, failed_pipe=None,
1102 stream=sys.stderr, descriptions=True,
1103 verbosity=1, failfast=False, buffer=False, resultclass=None):
1104 # ignore stream setting here, use hard-coded stdout to be in sync
1105 # with prints from VppTestCase methods ...
1106 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1107 verbosity, failfast, buffer,
1109 reporter = KeepAliveReporter()
1110 reporter.pipe = keep_alive_pipe
1111 # this is super-ugly, but very simple to implement and works as long
1112 # as we run only one test at the same time
1113 VppTestResult.test_framework_failed_pipe = failed_pipe
1115 test_option = "TEST"
1117 def parse_test_option(self):
1118 f = os.getenv(self.test_option, None)
1119 filter_file_name = None
1120 filter_class_name = None
1121 filter_func_name = None
1124 parts = f.split('.')
1126 raise Exception("Unrecognized %s option: %s" %
1127 (self.test_option, f))
1129 if parts[2] not in ('*', ''):
1130 filter_func_name = parts[2]
1131 if parts[1] not in ('*', ''):
1132 filter_class_name = parts[1]
1133 if parts[0] not in ('*', ''):
1134 if parts[0].startswith('test_'):
1135 filter_file_name = parts[0]
1137 filter_file_name = 'test_%s' % parts[0]
1139 if f.startswith('test_'):
1140 filter_file_name = f
1142 filter_file_name = 'test_%s' % f
1143 return filter_file_name, filter_class_name, filter_func_name
1146 def filter_tests(tests, filter_cb):
1147 result = unittest.suite.TestSuite()
1149 if isinstance(t, unittest.suite.TestSuite):
1150 # this is a bunch of tests, recursively filter...
1151 x = VppTestRunner.filter_tests(t, filter_cb)
1152 if x.countTestCases() > 0:
1154 elif isinstance(t, unittest.TestCase):
1155 # this is a single test
1156 parts = t.id().split('.')
1157 # t.id() for common cases like this:
1158 # test_classifier.TestClassifier.test_acl_ip
1159 # apply filtering only if it is so
1161 if not filter_cb(parts[0], parts[1], parts[2]):
1165 # unexpected object, don't touch it
1169 def run(self, test):
1176 faulthandler.enable() # emit stack trace to stderr if killed by signal
1177 print("Running tests using custom test runner") # debug message
1178 filter_file, filter_class, filter_func = self.parse_test_option()
1179 print("Active filters: file=%s, class=%s, function=%s" % (
1180 filter_file, filter_class, filter_func))
1181 filter_cb = Filter_by_test_option(
1182 filter_file, filter_class, filter_func)
1183 filtered = self.filter_tests(test, filter_cb)
1184 print("%s out of %s tests match specified filters" % (
1185 filtered.countTestCases(), test.countTestCases()))
1186 if not running_extended_tests():
1187 print("Not running extended tests (some tests will be skipped)")
1188 return super(VppTestRunner, self).run(filtered)
1191 class Worker(Thread):
1192 def __init__(self, args, logger, env={}):
1193 self.logger = logger
1196 self.env = copy.deepcopy(env)
1197 super(Worker, self).__init__()
1200 executable = self.args[0]
1201 self.logger.debug("Running executable w/args `%s'" % self.args)
1202 env = os.environ.copy()
1203 env.update(self.env)
1204 env["CK_LOG_FILE_NAME"] = "-"
1205 self.process = subprocess.Popen(
1206 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1207 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1208 out, err = self.process.communicate()
1209 self.logger.debug("Finished running `%s'" % executable)
1210 self.logger.info("Return code is `%s'" % self.process.returncode)
1211 self.logger.info(single_line_delim)
1212 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1213 self.logger.info(single_line_delim)
1214 self.logger.info(out)
1215 self.logger.info(single_line_delim)
1216 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1217 self.logger.info(single_line_delim)
1218 self.logger.info(err)
1219 self.logger.info(single_line_delim)
1220 self.result = self.process.returncode