3 from __future__ import print_function
14 from collections import deque
15 from threading import Thread, Event
16 from inspect import getdoc, isclass
17 from traceback import format_exception
18 from logging import FileHandler, DEBUG, Formatter
19 from scapy.packet import Raw
20 from hook import StepHook, PollHook, VppDiedError
21 from vpp_pg_interface import VppPGInterface
22 from vpp_sub_interface import VppSubInterface
23 from vpp_lo_interface import VppLoInterface
24 from vpp_papi_provider import VppPapiProvider
25 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
27 from vpp_object import VppObjectRegistry
29 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
30 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
31 from scapy.layers.inet6 import ICMPv6EchoReply
32 if os.name == 'posix' and sys.version_info[0] < 3:
33 # using subprocess32 is recommended by python official documentation
34 # @ https://docs.python.org/2/library/subprocess.html
35 import subprocess32 as subprocess
40 debug_framework = False
41 if os.getenv('TEST_DEBUG', "0") == "1":
42 debug_framework = True
47 Test framework module.
49 The module provides a set of tools for constructing and running tests and
50 representing the results.
54 class _PacketInfo(object):
55 """Private class to create packet info object.
57 Help process information about the next packet.
58 Set variables to default values.
60 #: Store the index of the packet.
62 #: Store the index of the source packet generator interface of the packet.
64 #: Store the index of the destination packet generator interface
67 #: Store expected ip version
69 #: Store expected upper protocol
71 #: Store the copy of the former packet.
74 def __eq__(self, other):
75 index = self.index == other.index
76 src = self.src == other.src
77 dst = self.dst == other.dst
78 data = self.data == other.data
79 return index and src and dst and data
82 def pump_output(testclass):
83 """ pump output from vpp stdout/stderr to proper queues """
86 while not testclass.pump_thread_stop_flag.is_set():
87 readable = select.select([testclass.vpp.stdout.fileno(),
88 testclass.vpp.stderr.fileno(),
89 testclass.pump_thread_wakeup_pipe[0]],
91 if testclass.vpp.stdout.fileno() in readable:
92 read = os.read(testclass.vpp.stdout.fileno(), 102400)
94 split = read.splitlines(True)
95 if len(stdout_fragment) > 0:
96 split[0] = "%s%s" % (stdout_fragment, split[0])
97 if len(split) > 0 and split[-1].endswith("\n"):
101 stdout_fragment = split[-1]
102 testclass.vpp_stdout_deque.extend(split[:limit])
103 if not testclass.cache_vpp_output:
104 for line in split[:limit]:
105 testclass.logger.debug(
106 "VPP STDOUT: %s" % line.rstrip("\n"))
107 if testclass.vpp.stderr.fileno() in readable:
108 read = os.read(testclass.vpp.stderr.fileno(), 102400)
110 split = read.splitlines(True)
111 if len(stderr_fragment) > 0:
112 split[0] = "%s%s" % (stderr_fragment, split[0])
113 if len(split) > 0 and split[-1].endswith("\n"):
117 stderr_fragment = split[-1]
118 testclass.vpp_stderr_deque.extend(split[:limit])
119 if not testclass.cache_vpp_output:
120 for line in split[:limit]:
121 testclass.logger.debug(
122 "VPP STDERR: %s" % line.rstrip("\n"))
123 # ignoring the dummy pipe here intentionally - the flag will take care
124 # of properly terminating the loop
127 def running_extended_tests():
128 s = os.getenv("EXTENDED_TESTS", "n")
129 return True if s.lower() in ("y", "yes", "1") else False
132 def running_on_centos():
133 os_id = os.getenv("OS_ID", "")
134 return True if "centos" in os_id.lower() else False
137 class KeepAliveReporter(object):
139 Singleton object which reports test start to parent process
144 self.__dict__ = self._shared_state
151 def pipe(self, pipe):
152 if hasattr(self, '_pipe'):
153 raise Exception("Internal error - pipe should only be set once.")
156 def send_keep_alive(self, test):
158 Write current test tmpdir & desc to keep-alive pipe to signal liveness
160 if self.pipe is None:
161 # if not running forked..
167 desc = test.shortDescription()
171 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
174 class VppTestCase(unittest.TestCase):
175 """This subclass is a base class for VPP test cases that are implemented as
176 classes. It provides methods to create and run test case.
180 def packet_infos(self):
181 """List of packet infos"""
182 return self._packet_infos
185 def get_packet_count_for_if_idx(cls, dst_if_index):
186 """Get the number of packet info for specified destination if index"""
187 if dst_if_index in cls._packet_count_for_dst_if_idx:
188 return cls._packet_count_for_dst_if_idx[dst_if_index]
194 """Return the instance of this testcase"""
195 return cls.test_instance
198 def set_debug_flags(cls, d):
199 cls.debug_core = False
200 cls.debug_gdb = False
201 cls.debug_gdbserver = False
206 cls.debug_core = True
209 elif dl == "gdbserver":
210 cls.debug_gdbserver = True
212 raise Exception("Unrecognized DEBUG option: '%s'" % d)
215 def setUpConstants(cls):
216 """ Set-up the test case class based on environment variables """
217 s = os.getenv("STEP", "n")
218 cls.step = True if s.lower() in ("y", "yes", "1") else False
219 d = os.getenv("DEBUG", None)
220 c = os.getenv("CACHE_OUTPUT", "1")
221 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
222 cls.set_debug_flags(d)
223 cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
224 cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
225 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
227 if cls.plugin_path is not None:
228 if cls.extern_plugin_path is not None:
229 plugin_path = "%s:%s" % (
230 cls.plugin_path, cls.extern_plugin_path)
232 plugin_path = cls.plugin_path
233 elif cls.extern_plugin_path is not None:
234 plugin_path = cls.extern_plugin_path
236 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
237 debug_cli = "cli-listen localhost:5002"
239 size = os.getenv("COREDUMP_SIZE")
241 coredump_size = "coredump-size %s" % size
242 if coredump_size is None:
243 coredump_size = "coredump-size unlimited"
244 cls.vpp_cmdline = [cls.vpp_bin, "unix",
245 "{", "nodaemon", debug_cli, "full-coredump",
246 coredump_size, "}", "api-trace", "{", "on", "}",
247 "api-segment", "{", "prefix", cls.shm_prefix, "}",
248 "plugins", "{", "plugin", "dpdk_plugin.so", "{",
249 "disable", "}", "plugin", "unittest_plugin.so",
250 "{", "enable", "}", "}", ]
251 if plugin_path is not None:
252 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
253 cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
256 def wait_for_enter(cls):
257 if cls.debug_gdbserver:
258 print(double_line_delim)
259 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
261 print(double_line_delim)
262 print("Spawned VPP with PID: %d" % cls.vpp.pid)
264 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
266 print(single_line_delim)
267 print("You can debug the VPP using e.g.:")
268 if cls.debug_gdbserver:
269 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
270 print("Now is the time to attach a gdb by running the above "
271 "command, set up breakpoints etc. and then resume VPP from "
272 "within gdb by issuing the 'continue' command")
274 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
275 print("Now is the time to attach a gdb by running the above "
276 "command and set up breakpoints etc.")
277 print(single_line_delim)
278 raw_input("Press ENTER to continue running the testcase...")
282 cmdline = cls.vpp_cmdline
284 if cls.debug_gdbserver:
285 gdbserver = '/usr/bin/gdbserver'
286 if not os.path.isfile(gdbserver) or \
287 not os.access(gdbserver, os.X_OK):
288 raise Exception("gdbserver binary '%s' does not exist or is "
289 "not executable" % gdbserver)
291 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
292 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
295 cls.vpp = subprocess.Popen(cmdline,
296 stdout=subprocess.PIPE,
297 stderr=subprocess.PIPE,
299 except Exception as e:
300 cls.logger.critical("Couldn't start vpp: %s" % e)
308 Perform class setup before running the testcase
309 Remove shared memory files, start vpp and connect the vpp-api
311 gc.collect() # run garbage collection first
313 cls.logger = getLogger(cls.__name__)
314 cls.tempdir = tempfile.mkdtemp(
315 prefix='vpp-unittest-%s-' % cls.__name__)
316 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
317 cls.file_handler.setFormatter(
318 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
320 cls.file_handler.setLevel(DEBUG)
321 cls.logger.addHandler(cls.file_handler)
322 cls.shm_prefix = cls.tempdir.split("/")[-1]
323 os.chdir(cls.tempdir)
324 cls.logger.info("Temporary dir is %s, shm prefix is %s",
325 cls.tempdir, cls.shm_prefix)
327 cls.reset_packet_infos()
329 cls._zombie_captures = []
332 cls.registry = VppObjectRegistry()
333 cls.vpp_startup_failed = False
334 cls.reporter = KeepAliveReporter()
335 # need to catch exceptions here because if we raise, then the cleanup
336 # doesn't get called and we might end with a zombie vpp
339 cls.reporter.send_keep_alive(cls)
340 cls.vpp_stdout_deque = deque()
341 cls.vpp_stderr_deque = deque()
342 cls.pump_thread_stop_flag = Event()
343 cls.pump_thread_wakeup_pipe = os.pipe()
344 cls.pump_thread = Thread(target=pump_output, args=(cls,))
345 cls.pump_thread.daemon = True
346 cls.pump_thread.start()
347 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls)
352 cls.vapi.register_hook(hook)
353 cls.sleep(0.1, "after vpp startup, before initial poll")
357 cls.vpp_startup_failed = True
359 "VPP died shortly after startup, check the"
360 " output to standard error for possible cause")
366 cls.vapi.disconnect()
369 if cls.debug_gdbserver:
370 print(colorize("You're running VPP inside gdbserver but "
371 "VPP-API connection failed, did you forget "
372 "to 'continue' VPP from within gdb?", RED))
384 Disconnect vpp-api, kill vpp and cleanup shared memory files
386 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
388 if cls.vpp.returncode is None:
389 print(double_line_delim)
390 print("VPP or GDB server is still running")
391 print(single_line_delim)
392 raw_input("When done debugging, press ENTER to kill the "
393 "process and finish running the testcase...")
395 cls.pump_thread_stop_flag.set()
396 os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
397 if hasattr(cls, 'pump_thread'):
398 cls.logger.debug("Waiting for pump thread to stop")
399 cls.pump_thread.join()
400 if hasattr(cls, 'vpp_stderr_reader_thread'):
401 cls.logger.debug("Waiting for stdderr pump to stop")
402 cls.vpp_stderr_reader_thread.join()
404 if hasattr(cls, 'vpp'):
405 if hasattr(cls, 'vapi'):
406 cls.vapi.disconnect()
409 if cls.vpp.returncode is None:
410 cls.logger.debug("Sending TERM to vpp")
412 cls.logger.debug("Waiting for vpp to die")
413 cls.vpp.communicate()
416 if cls.vpp_startup_failed:
417 stdout_log = cls.logger.info
418 stderr_log = cls.logger.critical
420 stdout_log = cls.logger.info
421 stderr_log = cls.logger.info
423 if hasattr(cls, 'vpp_stdout_deque'):
424 stdout_log(single_line_delim)
425 stdout_log('VPP output to stdout while running %s:', cls.__name__)
426 stdout_log(single_line_delim)
427 vpp_output = "".join(cls.vpp_stdout_deque)
428 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
430 stdout_log('\n%s', vpp_output)
431 stdout_log(single_line_delim)
433 if hasattr(cls, 'vpp_stderr_deque'):
434 stderr_log(single_line_delim)
435 stderr_log('VPP output to stderr while running %s:', cls.__name__)
436 stderr_log(single_line_delim)
437 vpp_output = "".join(cls.vpp_stderr_deque)
438 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
440 stderr_log('\n%s', vpp_output)
441 stderr_log(single_line_delim)
444 def tearDownClass(cls):
445 """ Perform final cleanup after running all tests in this test-case """
447 cls.file_handler.close()
448 cls.reset_packet_infos()
450 debug_internal.on_tear_down_class(cls)
453 """ Show various debug prints after each test """
454 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
455 (self.__class__.__name__, self._testMethodName,
456 self._testMethodDoc))
457 if not self.vpp_dead:
458 self.logger.debug(self.vapi.cli("show trace"))
459 self.logger.info(self.vapi.ppcli("show interface"))
460 self.logger.info(self.vapi.ppcli("show hardware"))
461 self.logger.info(self.vapi.ppcli("show error"))
462 self.logger.info(self.vapi.ppcli("show run"))
463 self.logger.info(self.vapi.ppcli("show log"))
464 self.registry.remove_vpp_config(self.logger)
465 # Save/Dump VPP api trace log
466 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
467 tmp_api_trace = "/tmp/%s" % api_trace
468 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
469 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
470 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
472 os.rename(tmp_api_trace, vpp_api_trace_log)
473 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
476 self.registry.unregister_all(self.logger)
479 """ Clear trace before running each test"""
480 self.reporter.send_keep_alive(self)
481 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
482 (self.__class__.__name__, self._testMethodName,
483 self._testMethodDoc))
485 raise Exception("VPP is dead when setting up the test")
486 self.sleep(.1, "during setUp")
487 self.vpp_stdout_deque.append(
488 "--- test setUp() for %s.%s(%s) starts here ---\n" %
489 (self.__class__.__name__, self._testMethodName,
490 self._testMethodDoc))
491 self.vpp_stderr_deque.append(
492 "--- test setUp() for %s.%s(%s) starts here ---\n" %
493 (self.__class__.__name__, self._testMethodName,
494 self._testMethodDoc))
495 self.vapi.cli("clear trace")
496 # store the test instance inside the test class - so that objects
497 # holding the class can access instance methods (like assertEqual)
498 type(self).test_instance = self
501 def pg_enable_capture(cls, interfaces=None):
503 Enable capture on packet-generator interfaces
505 :param interfaces: iterable interface indexes (if None,
506 use self.pg_interfaces)
509 if interfaces is None:
510 interfaces = cls.pg_interfaces
515 def register_capture(cls, cap_name):
516 """ Register a capture in the testclass """
517 # add to the list of captures with current timestamp
518 cls._captures.append((time.time(), cap_name))
519 # filter out from zombies
520 cls._zombie_captures = [(stamp, name)
521 for (stamp, name) in cls._zombie_captures
526 """ Remove any zombie captures and enable the packet generator """
527 # how long before capture is allowed to be deleted - otherwise vpp
528 # crashes - 100ms seems enough (this shouldn't be needed at all)
531 for stamp, cap_name in cls._zombie_captures:
532 wait = stamp + capture_ttl - now
534 cls.sleep(wait, "before deleting capture %s" % cap_name)
536 cls.logger.debug("Removing zombie capture %s" % cap_name)
537 cls.vapi.cli('packet-generator delete %s' % cap_name)
539 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
540 cls.vapi.cli('packet-generator enable')
541 cls._zombie_captures = cls._captures
545 def create_pg_interfaces(cls, interfaces):
547 Create packet-generator interfaces.
549 :param interfaces: iterable indexes of the interfaces.
550 :returns: List of created interfaces.
555 intf = VppPGInterface(cls, i)
556 setattr(cls, intf.name, intf)
558 cls.pg_interfaces = result
562 def create_loopback_interfaces(cls, count):
564 Create loopback interfaces.
566 :param count: number of interfaces created.
567 :returns: List of created interfaces.
569 result = [VppLoInterface(cls) for i in range(count)]
571 setattr(cls, intf.name, intf)
572 cls.lo_interfaces = result
576 def extend_packet(packet, size, padding=' '):
578 Extend packet to given size by padding with spaces or custom padding
579 NOTE: Currently works only when Raw layer is present.
581 :param packet: packet
582 :param size: target size
583 :param padding: padding used to extend the payload
586 packet_len = len(packet) + 4
587 extend = size - packet_len
589 num = (extend / len(padding)) + 1
590 packet[Raw].load += (padding * num)[:extend]
593 def reset_packet_infos(cls):
594 """ Reset the list of packet info objects and packet counts to zero """
595 cls._packet_infos = {}
596 cls._packet_count_for_dst_if_idx = {}
599 def create_packet_info(cls, src_if, dst_if):
601 Create packet info object containing the source and destination indexes
602 and add it to the testcase's packet info list
604 :param VppInterface src_if: source interface
605 :param VppInterface dst_if: destination interface
607 :returns: _PacketInfo object
611 info.index = len(cls._packet_infos)
612 info.src = src_if.sw_if_index
613 info.dst = dst_if.sw_if_index
614 if isinstance(dst_if, VppSubInterface):
615 dst_idx = dst_if.parent.sw_if_index
617 dst_idx = dst_if.sw_if_index
618 if dst_idx in cls._packet_count_for_dst_if_idx:
619 cls._packet_count_for_dst_if_idx[dst_idx] += 1
621 cls._packet_count_for_dst_if_idx[dst_idx] = 1
622 cls._packet_infos[info.index] = info
626 def info_to_payload(info):
628 Convert _PacketInfo object to packet payload
630 :param info: _PacketInfo object
632 :returns: string containing serialized data from packet info
634 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
638 def payload_to_info(payload):
640 Convert packet payload to _PacketInfo object
642 :param payload: packet payload
644 :returns: _PacketInfo object containing de-serialized data from payload
647 numbers = payload.split()
649 info.index = int(numbers[0])
650 info.src = int(numbers[1])
651 info.dst = int(numbers[2])
652 info.ip = int(numbers[3])
653 info.proto = int(numbers[4])
656 def get_next_packet_info(self, info):
658 Iterate over the packet info list stored in the testcase
659 Start iteration with first element if info is None
660 Continue based on index in info if info is specified
662 :param info: info or None
663 :returns: next info in list or None if no more infos
668 next_index = info.index + 1
669 if next_index == len(self._packet_infos):
672 return self._packet_infos[next_index]
674 def get_next_packet_info_for_interface(self, src_index, info):
676 Search the packet info list for the next packet info with same source
679 :param src_index: source interface index to search for
680 :param info: packet info - where to start the search
681 :returns: packet info or None
685 info = self.get_next_packet_info(info)
688 if info.src == src_index:
691 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
693 Search the packet info list for the next packet info with same source
694 and destination interface indexes
696 :param src_index: source interface index to search for
697 :param dst_index: destination interface index to search for
698 :param info: packet info - where to start the search
699 :returns: packet info or None
703 info = self.get_next_packet_info_for_interface(src_index, info)
706 if info.dst == dst_index:
709 def assert_equal(self, real_value, expected_value, name_or_class=None):
710 if name_or_class is None:
711 self.assertEqual(real_value, expected_value)
714 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
715 msg = msg % (getdoc(name_or_class).strip(),
716 real_value, str(name_or_class(real_value)),
717 expected_value, str(name_or_class(expected_value)))
719 msg = "Invalid %s: %s does not match expected value %s" % (
720 name_or_class, real_value, expected_value)
722 self.assertEqual(real_value, expected_value, msg)
724 def assert_in_range(self,
732 msg = "Invalid %s: %s out of range <%s,%s>" % (
733 name, real_value, expected_min, expected_max)
734 self.assertTrue(expected_min <= real_value <= expected_max, msg)
736 def assert_packet_checksums_valid(self, packet,
737 ignore_zero_udp_checksums=True):
738 received = packet.__class__(str(packet))
740 ppp("Verifying packet checksums for packet:", received))
741 udp_layers = ['UDP', 'UDPerror']
742 checksum_fields = ['cksum', 'chksum']
745 temp = received.__class__(str(received))
747 layer = temp.getlayer(counter)
749 for cf in checksum_fields:
750 if hasattr(layer, cf):
751 if ignore_zero_udp_checksums and \
752 0 == getattr(layer, cf) and \
753 layer.name in udp_layers:
756 checksums.append((counter, cf))
759 counter = counter + 1
760 if 0 == len(checksums):
762 temp = temp.__class__(str(temp))
763 for layer, cf in checksums:
764 calc_sum = getattr(temp[layer], cf)
766 getattr(received[layer], cf), calc_sum,
767 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
769 "Checksum field `%s` on `%s` layer has correct value `%s`" %
770 (cf, temp[layer].name, calc_sum))
772 def assert_checksum_valid(self, received_packet, layer,
774 ignore_zero_checksum=False):
775 """ Check checksum of received packet on given layer """
776 received_packet_checksum = getattr(received_packet[layer], field_name)
777 if ignore_zero_checksum and 0 == received_packet_checksum:
779 recalculated = received_packet.__class__(str(received_packet))
780 delattr(recalculated[layer], field_name)
781 recalculated = recalculated.__class__(str(recalculated))
782 self.assert_equal(received_packet_checksum,
783 getattr(recalculated[layer], field_name),
784 "packet checksum on layer: %s" % layer)
786 def assert_ip_checksum_valid(self, received_packet,
787 ignore_zero_checksum=False):
788 self.assert_checksum_valid(received_packet, 'IP',
789 ignore_zero_checksum=ignore_zero_checksum)
791 def assert_tcp_checksum_valid(self, received_packet,
792 ignore_zero_checksum=False):
793 self.assert_checksum_valid(received_packet, 'TCP',
794 ignore_zero_checksum=ignore_zero_checksum)
796 def assert_udp_checksum_valid(self, received_packet,
797 ignore_zero_checksum=True):
798 self.assert_checksum_valid(received_packet, 'UDP',
799 ignore_zero_checksum=ignore_zero_checksum)
801 def assert_embedded_icmp_checksum_valid(self, received_packet):
802 if received_packet.haslayer(IPerror):
803 self.assert_checksum_valid(received_packet, 'IPerror')
804 if received_packet.haslayer(TCPerror):
805 self.assert_checksum_valid(received_packet, 'TCPerror')
806 if received_packet.haslayer(UDPerror):
807 self.assert_checksum_valid(received_packet, 'UDPerror',
808 ignore_zero_checksum=True)
809 if received_packet.haslayer(ICMPerror):
810 self.assert_checksum_valid(received_packet, 'ICMPerror')
812 def assert_icmp_checksum_valid(self, received_packet):
813 self.assert_checksum_valid(received_packet, 'ICMP')
814 self.assert_embedded_icmp_checksum_valid(received_packet)
816 def assert_icmpv6_checksum_valid(self, pkt):
817 if pkt.haslayer(ICMPv6DestUnreach):
818 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
819 self.assert_embedded_icmp_checksum_valid(pkt)
820 if pkt.haslayer(ICMPv6EchoRequest):
821 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
822 if pkt.haslayer(ICMPv6EchoReply):
823 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
826 def sleep(cls, timeout, remark=None):
827 if hasattr(cls, 'logger'):
828 cls.logger.debug("Starting sleep for %ss (%s)" % (timeout, remark))
832 if after - before > 2 * timeout:
833 cls.logger.error("unexpected time.sleep() result - "
834 "slept for %ss instead of ~%ss!" % (
835 after - before, timeout))
836 if hasattr(cls, 'logger'):
838 "Finished sleep (%s) - slept %ss (wanted %ss)" % (
839 remark, after - before, timeout))
841 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
842 self.vapi.cli("clear trace")
843 intf.add_stream(pkts)
844 self.pg_enable_capture(self.pg_interfaces)
848 for i in self.pg_interfaces:
849 i.get_capture(0, timeout=timeout)
850 i.assert_nothing_captured(remark=remark)
853 def send_and_expect(self, input, pkts, output):
854 self.vapi.cli("clear trace")
855 input.add_stream(pkts)
856 self.pg_enable_capture(self.pg_interfaces)
858 rx = output.get_capture(len(pkts))
862 class TestCasePrinter(object):
866 self.__dict__ = self._shared_state
867 if not hasattr(self, "_test_case_set"):
868 self._test_case_set = set()
870 def print_test_case_heading_if_first_time(self, case):
871 if case.__class__ not in self._test_case_set:
872 print(double_line_delim)
873 print(colorize(getdoc(case.__class__).splitlines()[0], YELLOW))
874 print(double_line_delim)
875 self._test_case_set.add(case.__class__)
878 class VppTestResult(unittest.TestResult):
880 @property result_string
881 String variable to store the test case result string.
883 List variable containing 2-tuples of TestCase instances and strings
884 holding formatted tracebacks. Each tuple represents a test which
885 raised an unexpected exception.
887 List variable containing 2-tuples of TestCase instances and strings
888 holding formatted tracebacks. Each tuple represents a test where
889 a failure was explicitly signalled using the TestCase.assert*()
893 def __init__(self, stream, descriptions, verbosity):
895 :param stream File descriptor to store where to report test results.
896 Set to the standard error stream by default.
897 :param descriptions Boolean variable to store information if to use
898 test case descriptions.
899 :param verbosity Integer variable to store required verbosity level.
901 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
903 self.descriptions = descriptions
904 self.verbosity = verbosity
905 self.result_string = None
906 self.printer = TestCasePrinter()
908 def addSuccess(self, test):
910 Record a test succeeded result
915 if hasattr(test, 'logger'):
916 test.logger.debug("--- addSuccess() %s.%s(%s) called"
917 % (test.__class__.__name__,
918 test._testMethodName,
919 test._testMethodDoc))
920 unittest.TestResult.addSuccess(self, test)
921 self.result_string = colorize("OK", GREEN)
923 def addSkip(self, test, reason):
925 Record a test skipped.
931 if hasattr(test, 'logger'):
932 test.logger.debug("--- addSkip() %s.%s(%s) called, reason is %s"
933 % (test.__class__.__name__,
934 test._testMethodName,
937 unittest.TestResult.addSkip(self, test, reason)
938 self.result_string = colorize("SKIP", YELLOW)
940 def symlink_failed(self, test):
942 if hasattr(test, 'logger'):
944 if hasattr(test, 'tempdir'):
946 failed_dir = os.getenv('VPP_TEST_FAILED_DIR')
947 link_path = '%s/%s-FAILED' % (failed_dir,
948 test.tempdir.split("/")[-1])
950 logger.debug("creating a link to the failed test")
951 logger.debug("os.symlink(%s, %s)" %
952 (test.tempdir, link_path))
953 os.symlink(test.tempdir, link_path)
954 except Exception as e:
958 def send_failure_through_pipe(self, test):
959 if hasattr(self, 'test_framework_failed_pipe'):
960 pipe = self.test_framework_failed_pipe
962 if test.__class__.__name__ == "_ErrorHolder":
964 if x.startswith("setUpClass"):
965 # x looks like setUpClass (test_function.test_class)
966 cls = x.split(".")[1].split(")")[0]
967 for t in self.test_suite:
968 if t.__class__.__name__ == cls:
969 pipe.send(t.__class__)
972 raise Exception("Can't find class name `%s' "
973 "(from ErrorHolder) in test suite "
974 "`%s'" % (cls, self.test_suite))
976 raise Exception("FIXME: unexpected special case - "
977 "ErrorHolder description is `%s'" %
980 pipe.send(test.__class__)
982 def addFailure(self, test, err):
984 Record a test failed result
987 :param err: error message
990 if hasattr(test, 'logger'):
991 test.logger.debug("--- addFailure() %s.%s(%s) called, err is %s"
992 % (test.__class__.__name__,
993 test._testMethodName,
994 test._testMethodDoc, err))
995 test.logger.debug("formatted exception is:\n%s" %
996 "".join(format_exception(*err)))
997 unittest.TestResult.addFailure(self, test, err)
998 if hasattr(test, 'tempdir'):
999 self.result_string = colorize("FAIL", RED) + \
1000 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
1001 self.symlink_failed(test)
1003 self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
1005 self.send_failure_through_pipe(test)
1007 def addError(self, test, err):
1009 Record a test error result
1012 :param err: error message
1015 if hasattr(test, 'logger'):
1016 test.logger.debug("--- addError() %s.%s(%s) called, err is %s"
1017 % (test.__class__.__name__,
1018 test._testMethodName,
1019 test._testMethodDoc, err))
1020 test.logger.debug("formatted exception is:\n%s" %
1021 "".join(format_exception(*err)))
1022 unittest.TestResult.addError(self, test, err)
1023 if hasattr(test, 'tempdir'):
1024 self.result_string = colorize("ERROR", RED) + \
1025 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
1026 self.symlink_failed(test)
1028 self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
1030 self.send_failure_through_pipe(test)
1032 def getDescription(self, test):
1034 Get test description
1037 :returns: test description
1040 # TODO: if none print warning not raise exception
1041 short_description = test.shortDescription()
1042 if self.descriptions and short_description:
1043 return short_description
1047 def startTest(self, test):
1054 self.printer.print_test_case_heading_if_first_time(test)
1055 unittest.TestResult.startTest(self, test)
1056 if self.verbosity > 0:
1057 self.stream.writeln(
1058 "Starting " + self.getDescription(test) + " ...")
1059 self.stream.writeln(single_line_delim)
1061 def stopTest(self, test):
1068 unittest.TestResult.stopTest(self, test)
1069 if self.verbosity > 0:
1070 self.stream.writeln(single_line_delim)
1071 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1072 self.result_string))
1073 self.stream.writeln(single_line_delim)
1075 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1076 self.result_string))
1078 def printErrors(self):
1080 Print errors from running the test case
1082 self.stream.writeln()
1083 self.printErrorList('ERROR', self.errors)
1084 self.printErrorList('FAIL', self.failures)
1086 def printErrorList(self, flavour, errors):
1088 Print error list to the output stream together with error type
1089 and test case description.
1091 :param flavour: error type
1092 :param errors: iterable errors
1095 for test, err in errors:
1096 self.stream.writeln(double_line_delim)
1097 self.stream.writeln("%s: %s" %
1098 (flavour, self.getDescription(test)))
1099 self.stream.writeln(single_line_delim)
1100 self.stream.writeln("%s" % err)
1103 class Filter_by_test_option:
1104 def __init__(self, filter_file_name, filter_class_name, filter_func_name):
1105 self.filter_file_name = filter_file_name
1106 self.filter_class_name = filter_class_name
1107 self.filter_func_name = filter_func_name
1109 def __call__(self, file_name, class_name, func_name):
1110 if self.filter_file_name and file_name != self.filter_file_name:
1112 if self.filter_class_name and class_name != self.filter_class_name:
1114 if self.filter_func_name and func_name != self.filter_func_name:
1119 class VppTestRunner(unittest.TextTestRunner):
1121 A basic test runner implementation which prints results to standard error.
1124 def resultclass(self):
1125 """Class maintaining the results of the tests"""
1126 return VppTestResult
1128 def __init__(self, keep_alive_pipe=None, failed_pipe=None,
1129 stream=sys.stderr, descriptions=True,
1130 verbosity=1, failfast=False, buffer=False, resultclass=None):
1131 # ignore stream setting here, use hard-coded stdout to be in sync
1132 # with prints from VppTestCase methods ...
1133 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1134 verbosity, failfast, buffer,
1136 reporter = KeepAliveReporter()
1137 reporter.pipe = keep_alive_pipe
1138 # this is super-ugly, but very simple to implement and works as long
1139 # as we run only one test at the same time
1140 VppTestResult.test_framework_failed_pipe = failed_pipe
1142 test_option = "TEST"
1144 def parse_test_option(self):
1145 f = os.getenv(self.test_option, None)
1146 filter_file_name = None
1147 filter_class_name = None
1148 filter_func_name = None
1151 parts = f.split('.')
1153 raise Exception("Unrecognized %s option: %s" %
1154 (self.test_option, f))
1156 if parts[2] not in ('*', ''):
1157 filter_func_name = parts[2]
1158 if parts[1] not in ('*', ''):
1159 filter_class_name = parts[1]
1160 if parts[0] not in ('*', ''):
1161 if parts[0].startswith('test_'):
1162 filter_file_name = parts[0]
1164 filter_file_name = 'test_%s' % parts[0]
1166 if f.startswith('test_'):
1167 filter_file_name = f
1169 filter_file_name = 'test_%s' % f
1170 return filter_file_name, filter_class_name, filter_func_name
1173 def filter_tests(tests, filter_cb):
1174 result = unittest.suite.TestSuite()
1176 if isinstance(t, unittest.suite.TestSuite):
1177 # this is a bunch of tests, recursively filter...
1178 x = VppTestRunner.filter_tests(t, filter_cb)
1179 if x.countTestCases() > 0:
1181 elif isinstance(t, unittest.TestCase):
1182 # this is a single test
1183 parts = t.id().split('.')
1184 # t.id() for common cases like this:
1185 # test_classifier.TestClassifier.test_acl_ip
1186 # apply filtering only if it is so
1188 if not filter_cb(parts[0], parts[1], parts[2]):
1192 # unexpected object, don't touch it
1196 def run(self, test):
1203 faulthandler.enable() # emit stack trace to stderr if killed by signal
1204 print("Running tests using custom test runner") # debug message
1205 filter_file, filter_class, filter_func = self.parse_test_option()
1206 print("Active filters: file=%s, class=%s, function=%s" % (
1207 filter_file, filter_class, filter_func))
1208 filter_cb = Filter_by_test_option(
1209 filter_file, filter_class, filter_func)
1210 filtered = self.filter_tests(test, filter_cb)
1211 print("%s out of %s tests match specified filters" % (
1212 filtered.countTestCases(), test.countTestCases()))
1213 if not running_extended_tests():
1214 print("Not running extended tests (some tests will be skipped)")
1215 # super-ugly hack #2
1216 VppTestResult.test_suite = filtered
1217 return super(VppTestRunner, self).run(filtered)
1220 class Worker(Thread):
1221 def __init__(self, args, logger, env={}):
1222 self.logger = logger
1225 self.env = copy.deepcopy(env)
1226 super(Worker, self).__init__()
1229 executable = self.args[0]
1230 self.logger.debug("Running executable w/args `%s'" % self.args)
1231 env = os.environ.copy()
1232 env.update(self.env)
1233 env["CK_LOG_FILE_NAME"] = "-"
1234 self.process = subprocess.Popen(
1235 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1236 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1237 out, err = self.process.communicate()
1238 self.logger.debug("Finished running `%s'" % executable)
1239 self.logger.info("Return code is `%s'" % self.process.returncode)
1240 self.logger.info(single_line_delim)
1241 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1242 self.logger.info(single_line_delim)
1243 self.logger.info(out)
1244 self.logger.info(single_line_delim)
1245 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1246 self.logger.info(single_line_delim)
1247 self.logger.info(err)
1248 self.logger.info(single_line_delim)
1249 self.result = self.process.returncode