3 from __future__ import print_function
14 from collections import deque
15 from threading import Thread, Event
16 from inspect import getdoc, isclass
17 from traceback import format_exception
18 from logging import FileHandler, DEBUG, Formatter
19 from scapy.packet import Raw
20 from hook import StepHook, PollHook, VppDiedError
21 from vpp_pg_interface import VppPGInterface
22 from vpp_sub_interface import VppSubInterface
23 from vpp_lo_interface import VppLoInterface
24 from vpp_papi_provider import VppPapiProvider
25 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
27 from vpp_object import VppObjectRegistry
29 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
30 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
31 from scapy.layers.inet6 import ICMPv6EchoReply
32 if os.name == 'posix' and sys.version_info[0] < 3:
33 # using subprocess32 is recommended by python official documentation
34 # @ https://docs.python.org/2/library/subprocess.html
35 import subprocess32 as subprocess
40 debug_framework = False
41 if os.getenv('TEST_DEBUG', "0") == "1":
42 debug_framework = True
47 Test framework module.
49 The module provides a set of tools for constructing and running tests and
50 representing the results.
54 class _PacketInfo(object):
55 """Private class to create packet info object.
57 Help process information about the next packet.
58 Set variables to default values.
60 #: Store the index of the packet.
62 #: Store the index of the source packet generator interface of the packet.
64 #: Store the index of the destination packet generator interface
67 #: Store expected ip version
69 #: Store expected upper protocol
71 #: Store the copy of the former packet.
74 def __eq__(self, other):
75 index = self.index == other.index
76 src = self.src == other.src
77 dst = self.dst == other.dst
78 data = self.data == other.data
79 return index and src and dst and data
82 def pump_output(testclass):
83 """ pump output from vpp stdout/stderr to proper queues """
86 while not testclass.pump_thread_stop_flag.wait(0):
87 readable = select.select([testclass.vpp.stdout.fileno(),
88 testclass.vpp.stderr.fileno(),
89 testclass.pump_thread_wakeup_pipe[0]],
91 if testclass.vpp.stdout.fileno() in readable:
92 read = os.read(testclass.vpp.stdout.fileno(), 102400)
94 split = read.splitlines(True)
95 if len(stdout_fragment) > 0:
96 split[0] = "%s%s" % (stdout_fragment, split[0])
97 if len(split) > 0 and split[-1].endswith("\n"):
101 stdout_fragment = split[-1]
102 testclass.vpp_stdout_deque.extend(split[:limit])
103 if not testclass.cache_vpp_output:
104 for line in split[:limit]:
105 testclass.logger.debug(
106 "VPP STDOUT: %s" % line.rstrip("\n"))
107 if testclass.vpp.stderr.fileno() in readable:
108 read = os.read(testclass.vpp.stderr.fileno(), 102400)
110 split = read.splitlines(True)
111 if len(stderr_fragment) > 0:
112 split[0] = "%s%s" % (stderr_fragment, split[0])
113 if len(split) > 0 and split[-1].endswith("\n"):
117 stderr_fragment = split[-1]
118 testclass.vpp_stderr_deque.extend(split[:limit])
119 if not testclass.cache_vpp_output:
120 for line in split[:limit]:
121 testclass.logger.debug(
122 "VPP STDERR: %s" % line.rstrip("\n"))
123 # ignoring the dummy pipe here intentionally - the flag will take care
124 # of properly terminating the loop
127 def running_extended_tests():
128 s = os.getenv("EXTENDED_TESTS", "n")
129 return True if s.lower() in ("y", "yes", "1") else False
132 def running_on_centos():
133 os_id = os.getenv("OS_ID", "")
134 return True if "centos" in os_id.lower() else False
137 class KeepAliveReporter(object):
139 Singleton object which reports test start to parent process
144 self.__dict__ = self._shared_state
151 def pipe(self, pipe):
152 if hasattr(self, '_pipe'):
153 raise Exception("Internal error - pipe should only be set once.")
156 def send_keep_alive(self, test):
158 Write current test tmpdir & desc to keep-alive pipe to signal liveness
160 if self.pipe is None:
161 # if not running forked..
167 desc = test.shortDescription()
171 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
174 class VppTestCase(unittest.TestCase):
175 """This subclass is a base class for VPP test cases that are implemented as
176 classes. It provides methods to create and run test case.
180 def packet_infos(self):
181 """List of packet infos"""
182 return self._packet_infos
185 def get_packet_count_for_if_idx(cls, dst_if_index):
186 """Get the number of packet info for specified destination if index"""
187 if dst_if_index in cls._packet_count_for_dst_if_idx:
188 return cls._packet_count_for_dst_if_idx[dst_if_index]
194 """Return the instance of this testcase"""
195 return cls.test_instance
198 def set_debug_flags(cls, d):
199 cls.debug_core = False
200 cls.debug_gdb = False
201 cls.debug_gdbserver = False
206 cls.debug_core = True
209 elif dl == "gdbserver":
210 cls.debug_gdbserver = True
212 raise Exception("Unrecognized DEBUG option: '%s'" % d)
215 def setUpConstants(cls):
216 """ Set-up the test case class based on environment variables """
217 s = os.getenv("STEP", "n")
218 cls.step = True if s.lower() in ("y", "yes", "1") else False
219 d = os.getenv("DEBUG", None)
220 c = os.getenv("CACHE_OUTPUT", "1")
221 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
222 cls.set_debug_flags(d)
223 cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
224 cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
225 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
227 if cls.plugin_path is not None:
228 if cls.extern_plugin_path is not None:
229 plugin_path = "%s:%s" % (
230 cls.plugin_path, cls.extern_plugin_path)
232 plugin_path = cls.plugin_path
233 elif cls.extern_plugin_path is not None:
234 plugin_path = cls.extern_plugin_path
236 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
237 debug_cli = "cli-listen localhost:5002"
239 size = os.getenv("COREDUMP_SIZE")
241 coredump_size = "coredump-size %s" % size
242 if coredump_size is None:
243 coredump_size = "coredump-size unlimited"
244 cls.vpp_cmdline = [cls.vpp_bin, "unix",
245 "{", "nodaemon", debug_cli, "full-coredump",
246 coredump_size, "}", "api-trace", "{", "on", "}",
247 "api-segment", "{", "prefix", cls.shm_prefix, "}",
248 "plugins", "{", "plugin", "dpdk_plugin.so", "{",
249 "disable", "}", "}", ]
250 if plugin_path is not None:
251 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
252 cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
255 def wait_for_enter(cls):
256 if cls.debug_gdbserver:
257 print(double_line_delim)
258 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
260 print(double_line_delim)
261 print("Spawned VPP with PID: %d" % cls.vpp.pid)
263 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
265 print(single_line_delim)
266 print("You can debug the VPP using e.g.:")
267 if cls.debug_gdbserver:
268 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
269 print("Now is the time to attach a gdb by running the above "
270 "command, set up breakpoints etc. and then resume VPP from "
271 "within gdb by issuing the 'continue' command")
273 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
274 print("Now is the time to attach a gdb by running the above "
275 "command and set up breakpoints etc.")
276 print(single_line_delim)
277 raw_input("Press ENTER to continue running the testcase...")
281 cmdline = cls.vpp_cmdline
283 if cls.debug_gdbserver:
284 gdbserver = '/usr/bin/gdbserver'
285 if not os.path.isfile(gdbserver) or \
286 not os.access(gdbserver, os.X_OK):
287 raise Exception("gdbserver binary '%s' does not exist or is "
288 "not executable" % gdbserver)
290 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
291 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
294 cls.vpp = subprocess.Popen(cmdline,
295 stdout=subprocess.PIPE,
296 stderr=subprocess.PIPE,
298 except Exception as e:
299 cls.logger.critical("Couldn't start vpp: %s" % e)
307 Perform class setup before running the testcase
308 Remove shared memory files, start vpp and connect the vpp-api
310 gc.collect() # run garbage collection first
312 cls.logger = getLogger(cls.__name__)
313 cls.tempdir = tempfile.mkdtemp(
314 prefix='vpp-unittest-%s-' % cls.__name__)
315 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
316 cls.file_handler.setFormatter(
317 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
319 cls.file_handler.setLevel(DEBUG)
320 cls.logger.addHandler(cls.file_handler)
321 cls.shm_prefix = cls.tempdir.split("/")[-1]
322 os.chdir(cls.tempdir)
323 cls.logger.info("Temporary dir is %s, shm prefix is %s",
324 cls.tempdir, cls.shm_prefix)
326 cls.reset_packet_infos()
328 cls._zombie_captures = []
331 cls.registry = VppObjectRegistry()
332 cls.vpp_startup_failed = False
333 cls.reporter = KeepAliveReporter()
334 # need to catch exceptions here because if we raise, then the cleanup
335 # doesn't get called and we might end with a zombie vpp
338 cls.reporter.send_keep_alive(cls)
339 cls.vpp_stdout_deque = deque()
340 cls.vpp_stderr_deque = deque()
341 cls.pump_thread_stop_flag = Event()
342 cls.pump_thread_wakeup_pipe = os.pipe()
343 cls.pump_thread = Thread(target=pump_output, args=(cls,))
344 cls.pump_thread.daemon = True
345 cls.pump_thread.start()
346 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls)
351 cls.vapi.register_hook(hook)
352 cls.sleep(0.1, "after vpp startup, before initial poll")
356 cls.vpp_startup_failed = True
358 "VPP died shortly after startup, check the"
359 " output to standard error for possible cause")
365 cls.vapi.disconnect()
368 if cls.debug_gdbserver:
369 print(colorize("You're running VPP inside gdbserver but "
370 "VPP-API connection failed, did you forget "
371 "to 'continue' VPP from within gdb?", RED))
383 Disconnect vpp-api, kill vpp and cleanup shared memory files
385 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
387 if cls.vpp.returncode is None:
388 print(double_line_delim)
389 print("VPP or GDB server is still running")
390 print(single_line_delim)
391 raw_input("When done debugging, press ENTER to kill the "
392 "process and finish running the testcase...")
394 os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
395 cls.pump_thread_stop_flag.set()
396 if hasattr(cls, 'pump_thread'):
397 cls.logger.debug("Waiting for pump thread to stop")
398 cls.pump_thread.join()
399 if hasattr(cls, 'vpp_stderr_reader_thread'):
400 cls.logger.debug("Waiting for stdderr pump to stop")
401 cls.vpp_stderr_reader_thread.join()
403 if hasattr(cls, 'vpp'):
404 if hasattr(cls, 'vapi'):
405 cls.vapi.disconnect()
408 if cls.vpp.returncode is None:
409 cls.logger.debug("Sending TERM to vpp")
411 cls.logger.debug("Waiting for vpp to die")
412 cls.vpp.communicate()
415 if cls.vpp_startup_failed:
416 stdout_log = cls.logger.info
417 stderr_log = cls.logger.critical
419 stdout_log = cls.logger.info
420 stderr_log = cls.logger.info
422 if hasattr(cls, 'vpp_stdout_deque'):
423 stdout_log(single_line_delim)
424 stdout_log('VPP output to stdout while running %s:', cls.__name__)
425 stdout_log(single_line_delim)
426 vpp_output = "".join(cls.vpp_stdout_deque)
427 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
429 stdout_log('\n%s', vpp_output)
430 stdout_log(single_line_delim)
432 if hasattr(cls, 'vpp_stderr_deque'):
433 stderr_log(single_line_delim)
434 stderr_log('VPP output to stderr while running %s:', cls.__name__)
435 stderr_log(single_line_delim)
436 vpp_output = "".join(cls.vpp_stderr_deque)
437 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
439 stderr_log('\n%s', vpp_output)
440 stderr_log(single_line_delim)
443 def tearDownClass(cls):
444 """ Perform final cleanup after running all tests in this test-case """
446 cls.file_handler.close()
447 cls.reset_packet_infos()
449 debug_internal.on_tear_down_class(cls)
452 """ Show various debug prints after each test """
453 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
454 (self.__class__.__name__, self._testMethodName,
455 self._testMethodDoc))
456 if not self.vpp_dead:
457 self.logger.debug(self.vapi.cli("show trace"))
458 self.logger.info(self.vapi.ppcli("show interface"))
459 self.logger.info(self.vapi.ppcli("show hardware"))
460 self.logger.info(self.vapi.ppcli("show error"))
461 self.logger.info(self.vapi.ppcli("show run"))
462 self.logger.info(self.vapi.ppcli("show log"))
463 self.registry.remove_vpp_config(self.logger)
464 # Save/Dump VPP api trace log
465 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
466 tmp_api_trace = "/tmp/%s" % api_trace
467 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
468 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
469 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
471 os.rename(tmp_api_trace, vpp_api_trace_log)
472 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
475 self.registry.unregister_all(self.logger)
478 """ Clear trace before running each test"""
479 self.reporter.send_keep_alive(self)
480 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
481 (self.__class__.__name__, self._testMethodName,
482 self._testMethodDoc))
484 raise Exception("VPP is dead when setting up the test")
485 self.sleep(.1, "during setUp")
486 self.vpp_stdout_deque.append(
487 "--- test setUp() for %s.%s(%s) starts here ---\n" %
488 (self.__class__.__name__, self._testMethodName,
489 self._testMethodDoc))
490 self.vpp_stderr_deque.append(
491 "--- test setUp() for %s.%s(%s) starts here ---\n" %
492 (self.__class__.__name__, self._testMethodName,
493 self._testMethodDoc))
494 self.vapi.cli("clear trace")
495 # store the test instance inside the test class - so that objects
496 # holding the class can access instance methods (like assertEqual)
497 type(self).test_instance = self
500 def pg_enable_capture(cls, interfaces=None):
502 Enable capture on packet-generator interfaces
504 :param interfaces: iterable interface indexes (if None,
505 use self.pg_interfaces)
508 if interfaces is None:
509 interfaces = cls.pg_interfaces
514 def register_capture(cls, cap_name):
515 """ Register a capture in the testclass """
516 # add to the list of captures with current timestamp
517 cls._captures.append((time.time(), cap_name))
518 # filter out from zombies
519 cls._zombie_captures = [(stamp, name)
520 for (stamp, name) in cls._zombie_captures
525 """ Remove any zombie captures and enable the packet generator """
526 # how long before capture is allowed to be deleted - otherwise vpp
527 # crashes - 100ms seems enough (this shouldn't be needed at all)
530 for stamp, cap_name in cls._zombie_captures:
531 wait = stamp + capture_ttl - now
533 cls.sleep(wait, "before deleting capture %s" % cap_name)
535 cls.logger.debug("Removing zombie capture %s" % cap_name)
536 cls.vapi.cli('packet-generator delete %s' % cap_name)
538 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
539 cls.vapi.cli('packet-generator enable')
540 cls._zombie_captures = cls._captures
544 def create_pg_interfaces(cls, interfaces):
546 Create packet-generator interfaces.
548 :param interfaces: iterable indexes of the interfaces.
549 :returns: List of created interfaces.
554 intf = VppPGInterface(cls, i)
555 setattr(cls, intf.name, intf)
557 cls.pg_interfaces = result
561 def create_loopback_interfaces(cls, count):
563 Create loopback interfaces.
565 :param count: number of interfaces created.
566 :returns: List of created interfaces.
568 result = [VppLoInterface(cls) for i in range(count)]
570 setattr(cls, intf.name, intf)
571 cls.lo_interfaces = result
575 def extend_packet(packet, size, padding=' '):
577 Extend packet to given size by padding with spaces or custom padding
578 NOTE: Currently works only when Raw layer is present.
580 :param packet: packet
581 :param size: target size
582 :param padding: padding used to extend the payload
585 packet_len = len(packet) + 4
586 extend = size - packet_len
588 num = (extend / len(padding)) + 1
589 packet[Raw].load += (padding * num)[:extend]
592 def reset_packet_infos(cls):
593 """ Reset the list of packet info objects and packet counts to zero """
594 cls._packet_infos = {}
595 cls._packet_count_for_dst_if_idx = {}
598 def create_packet_info(cls, src_if, dst_if):
600 Create packet info object containing the source and destination indexes
601 and add it to the testcase's packet info list
603 :param VppInterface src_if: source interface
604 :param VppInterface dst_if: destination interface
606 :returns: _PacketInfo object
610 info.index = len(cls._packet_infos)
611 info.src = src_if.sw_if_index
612 info.dst = dst_if.sw_if_index
613 if isinstance(dst_if, VppSubInterface):
614 dst_idx = dst_if.parent.sw_if_index
616 dst_idx = dst_if.sw_if_index
617 if dst_idx in cls._packet_count_for_dst_if_idx:
618 cls._packet_count_for_dst_if_idx[dst_idx] += 1
620 cls._packet_count_for_dst_if_idx[dst_idx] = 1
621 cls._packet_infos[info.index] = info
625 def info_to_payload(info):
627 Convert _PacketInfo object to packet payload
629 :param info: _PacketInfo object
631 :returns: string containing serialized data from packet info
633 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
637 def payload_to_info(payload):
639 Convert packet payload to _PacketInfo object
641 :param payload: packet payload
643 :returns: _PacketInfo object containing de-serialized data from payload
646 numbers = payload.split()
648 info.index = int(numbers[0])
649 info.src = int(numbers[1])
650 info.dst = int(numbers[2])
651 info.ip = int(numbers[3])
652 info.proto = int(numbers[4])
655 def get_next_packet_info(self, info):
657 Iterate over the packet info list stored in the testcase
658 Start iteration with first element if info is None
659 Continue based on index in info if info is specified
661 :param info: info or None
662 :returns: next info in list or None if no more infos
667 next_index = info.index + 1
668 if next_index == len(self._packet_infos):
671 return self._packet_infos[next_index]
673 def get_next_packet_info_for_interface(self, src_index, info):
675 Search the packet info list for the next packet info with same source
678 :param src_index: source interface index to search for
679 :param info: packet info - where to start the search
680 :returns: packet info or None
684 info = self.get_next_packet_info(info)
687 if info.src == src_index:
690 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
692 Search the packet info list for the next packet info with same source
693 and destination interface indexes
695 :param src_index: source interface index to search for
696 :param dst_index: destination interface index to search for
697 :param info: packet info - where to start the search
698 :returns: packet info or None
702 info = self.get_next_packet_info_for_interface(src_index, info)
705 if info.dst == dst_index:
708 def assert_equal(self, real_value, expected_value, name_or_class=None):
709 if name_or_class is None:
710 self.assertEqual(real_value, expected_value)
713 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
714 msg = msg % (getdoc(name_or_class).strip(),
715 real_value, str(name_or_class(real_value)),
716 expected_value, str(name_or_class(expected_value)))
718 msg = "Invalid %s: %s does not match expected value %s" % (
719 name_or_class, real_value, expected_value)
721 self.assertEqual(real_value, expected_value, msg)
723 def assert_in_range(self,
731 msg = "Invalid %s: %s out of range <%s,%s>" % (
732 name, real_value, expected_min, expected_max)
733 self.assertTrue(expected_min <= real_value <= expected_max, msg)
735 def assert_packet_checksums_valid(self, packet,
736 ignore_zero_udp_checksums=True):
737 received = packet.__class__(str(packet))
739 ppp("Verifying packet checksums for packet:", received))
740 udp_layers = ['UDP', 'UDPerror']
741 checksum_fields = ['cksum', 'chksum']
744 temp = received.__class__(str(received))
746 layer = temp.getlayer(counter)
748 for cf in checksum_fields:
749 if hasattr(layer, cf):
750 if ignore_zero_udp_checksums and \
751 0 == getattr(layer, cf) and \
752 layer.name in udp_layers:
755 checksums.append((counter, cf))
758 counter = counter + 1
759 if 0 == len(checksums):
761 temp = temp.__class__(str(temp))
762 for layer, cf in checksums:
763 calc_sum = getattr(temp[layer], cf)
765 getattr(received[layer], cf), calc_sum,
766 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
768 "Checksum field `%s` on `%s` layer has correct value `%s`" %
769 (cf, temp[layer].name, calc_sum))
771 def assert_checksum_valid(self, received_packet, layer,
773 ignore_zero_checksum=False):
774 """ Check checksum of received packet on given layer """
775 received_packet_checksum = getattr(received_packet[layer], field_name)
776 if ignore_zero_checksum and 0 == received_packet_checksum:
778 recalculated = received_packet.__class__(str(received_packet))
779 delattr(recalculated[layer], field_name)
780 recalculated = recalculated.__class__(str(recalculated))
781 self.assert_equal(received_packet_checksum,
782 getattr(recalculated[layer], field_name),
783 "packet checksum on layer: %s" % layer)
785 def assert_ip_checksum_valid(self, received_packet,
786 ignore_zero_checksum=False):
787 self.assert_checksum_valid(received_packet, 'IP',
788 ignore_zero_checksum=ignore_zero_checksum)
790 def assert_tcp_checksum_valid(self, received_packet,
791 ignore_zero_checksum=False):
792 self.assert_checksum_valid(received_packet, 'TCP',
793 ignore_zero_checksum=ignore_zero_checksum)
795 def assert_udp_checksum_valid(self, received_packet,
796 ignore_zero_checksum=True):
797 self.assert_checksum_valid(received_packet, 'UDP',
798 ignore_zero_checksum=ignore_zero_checksum)
800 def assert_embedded_icmp_checksum_valid(self, received_packet):
801 if received_packet.haslayer(IPerror):
802 self.assert_checksum_valid(received_packet, 'IPerror')
803 if received_packet.haslayer(TCPerror):
804 self.assert_checksum_valid(received_packet, 'TCPerror')
805 if received_packet.haslayer(UDPerror):
806 self.assert_checksum_valid(received_packet, 'UDPerror',
807 ignore_zero_checksum=True)
808 if received_packet.haslayer(ICMPerror):
809 self.assert_checksum_valid(received_packet, 'ICMPerror')
811 def assert_icmp_checksum_valid(self, received_packet):
812 self.assert_checksum_valid(received_packet, 'ICMP')
813 self.assert_embedded_icmp_checksum_valid(received_packet)
815 def assert_icmpv6_checksum_valid(self, pkt):
816 if pkt.haslayer(ICMPv6DestUnreach):
817 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
818 self.assert_embedded_icmp_checksum_valid(pkt)
819 if pkt.haslayer(ICMPv6EchoRequest):
820 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
821 if pkt.haslayer(ICMPv6EchoReply):
822 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
825 def sleep(cls, timeout, remark=None):
826 if hasattr(cls, 'logger'):
827 cls.logger.debug("Starting sleep for %ss (%s)" % (timeout, remark))
831 if after - before > 2 * timeout:
832 cls.logger.error("unexpected time.sleep() result - "
833 "slept for %ss instead of ~%ss!" % (
834 after - before, timeout))
835 if hasattr(cls, 'logger'):
837 "Finished sleep (%s) - slept %ss (wanted %ss)" % (
838 remark, after - before, timeout))
840 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
841 self.vapi.cli("clear trace")
842 intf.add_stream(pkts)
843 self.pg_enable_capture(self.pg_interfaces)
847 for i in self.pg_interfaces:
848 i.get_capture(0, timeout=timeout)
849 i.assert_nothing_captured(remark=remark)
852 def send_and_expect(self, input, pkts, output):
853 self.vapi.cli("clear trace")
854 input.add_stream(pkts)
855 self.pg_enable_capture(self.pg_interfaces)
857 rx = output.get_capture(len(pkts))
861 class TestCasePrinter(object):
865 self.__dict__ = self._shared_state
866 if not hasattr(self, "_test_case_set"):
867 self._test_case_set = set()
869 def print_test_case_heading_if_first_time(self, case):
870 if case.__class__ not in self._test_case_set:
871 print(double_line_delim)
872 print(colorize(getdoc(case.__class__).splitlines()[0], YELLOW))
873 print(double_line_delim)
874 self._test_case_set.add(case.__class__)
877 class VppTestResult(unittest.TestResult):
879 @property result_string
880 String variable to store the test case result string.
882 List variable containing 2-tuples of TestCase instances and strings
883 holding formatted tracebacks. Each tuple represents a test which
884 raised an unexpected exception.
886 List variable containing 2-tuples of TestCase instances and strings
887 holding formatted tracebacks. Each tuple represents a test where
888 a failure was explicitly signalled using the TestCase.assert*()
892 def __init__(self, stream, descriptions, verbosity):
894 :param stream File descriptor to store where to report test results.
895 Set to the standard error stream by default.
896 :param descriptions Boolean variable to store information if to use
897 test case descriptions.
898 :param verbosity Integer variable to store required verbosity level.
900 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
902 self.descriptions = descriptions
903 self.verbosity = verbosity
904 self.result_string = None
905 self.printer = TestCasePrinter()
907 def addSuccess(self, test):
909 Record a test succeeded result
914 if hasattr(test, 'logger'):
915 test.logger.debug("--- addSuccess() %s.%s(%s) called"
916 % (test.__class__.__name__,
917 test._testMethodName,
918 test._testMethodDoc))
919 unittest.TestResult.addSuccess(self, test)
920 self.result_string = colorize("OK", GREEN)
922 def addSkip(self, test, reason):
924 Record a test skipped.
930 if hasattr(test, 'logger'):
931 test.logger.debug("--- addSkip() %s.%s(%s) called, reason is %s"
932 % (test.__class__.__name__,
933 test._testMethodName,
936 unittest.TestResult.addSkip(self, test, reason)
937 self.result_string = colorize("SKIP", YELLOW)
939 def symlink_failed(self, test):
941 if hasattr(test, 'logger'):
943 if hasattr(test, 'tempdir'):
945 failed_dir = os.getenv('VPP_TEST_FAILED_DIR')
946 link_path = '%s/%s-FAILED' % (failed_dir,
947 test.tempdir.split("/")[-1])
949 logger.debug("creating a link to the failed test")
950 logger.debug("os.symlink(%s, %s)" %
951 (test.tempdir, link_path))
952 os.symlink(test.tempdir, link_path)
953 except Exception as e:
957 def send_failure_through_pipe(self, test):
958 if hasattr(self, 'test_framework_failed_pipe'):
959 pipe = self.test_framework_failed_pipe
961 if test.__class__.__name__ == "_ErrorHolder":
963 if x.startswith("setUpClass"):
964 # x looks like setUpClass (test_function.test_class)
965 cls = x.split(".")[1].split(")")[0]
966 for t in self.test_suite:
967 if t.__class__.__name__ == cls:
968 pipe.send(t.__class__)
971 raise Exception("Can't find class name `%s' "
972 "(from ErrorHolder) in test suite "
973 "`%s'" % (cls, self.test_suite))
975 raise Exception("FIXME: unexpected special case - "
976 "ErrorHolder description is `%s'" %
979 pipe.send(test.__class__)
981 def addFailure(self, test, err):
983 Record a test failed result
986 :param err: error message
989 if hasattr(test, 'logger'):
990 test.logger.debug("--- addFailure() %s.%s(%s) called, err is %s"
991 % (test.__class__.__name__,
992 test._testMethodName,
993 test._testMethodDoc, err))
994 test.logger.debug("formatted exception is:\n%s" %
995 "".join(format_exception(*err)))
996 unittest.TestResult.addFailure(self, test, err)
997 if hasattr(test, 'tempdir'):
998 self.result_string = colorize("FAIL", RED) + \
999 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
1000 self.symlink_failed(test)
1002 self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
1004 self.send_failure_through_pipe(test)
1006 def addError(self, test, err):
1008 Record a test error result
1011 :param err: error message
1014 if hasattr(test, 'logger'):
1015 test.logger.debug("--- addError() %s.%s(%s) called, err is %s"
1016 % (test.__class__.__name__,
1017 test._testMethodName,
1018 test._testMethodDoc, err))
1019 test.logger.debug("formatted exception is:\n%s" %
1020 "".join(format_exception(*err)))
1021 unittest.TestResult.addError(self, test, err)
1022 if hasattr(test, 'tempdir'):
1023 self.result_string = colorize("ERROR", RED) + \
1024 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
1025 self.symlink_failed(test)
1027 self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
1029 self.send_failure_through_pipe(test)
1031 def getDescription(self, test):
1033 Get test description
1036 :returns: test description
1039 # TODO: if none print warning not raise exception
1040 short_description = test.shortDescription()
1041 if self.descriptions and short_description:
1042 return short_description
1046 def startTest(self, test):
1053 self.printer.print_test_case_heading_if_first_time(test)
1054 unittest.TestResult.startTest(self, test)
1055 if self.verbosity > 0:
1056 self.stream.writeln(
1057 "Starting " + self.getDescription(test) + " ...")
1058 self.stream.writeln(single_line_delim)
1060 def stopTest(self, test):
1067 unittest.TestResult.stopTest(self, test)
1068 if self.verbosity > 0:
1069 self.stream.writeln(single_line_delim)
1070 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1071 self.result_string))
1072 self.stream.writeln(single_line_delim)
1074 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1075 self.result_string))
1077 def printErrors(self):
1079 Print errors from running the test case
1081 self.stream.writeln()
1082 self.printErrorList('ERROR', self.errors)
1083 self.printErrorList('FAIL', self.failures)
1085 def printErrorList(self, flavour, errors):
1087 Print error list to the output stream together with error type
1088 and test case description.
1090 :param flavour: error type
1091 :param errors: iterable errors
1094 for test, err in errors:
1095 self.stream.writeln(double_line_delim)
1096 self.stream.writeln("%s: %s" %
1097 (flavour, self.getDescription(test)))
1098 self.stream.writeln(single_line_delim)
1099 self.stream.writeln("%s" % err)
1102 class Filter_by_test_option:
1103 def __init__(self, filter_file_name, filter_class_name, filter_func_name):
1104 self.filter_file_name = filter_file_name
1105 self.filter_class_name = filter_class_name
1106 self.filter_func_name = filter_func_name
1108 def __call__(self, file_name, class_name, func_name):
1109 if self.filter_file_name and file_name != self.filter_file_name:
1111 if self.filter_class_name and class_name != self.filter_class_name:
1113 if self.filter_func_name and func_name != self.filter_func_name:
1118 class VppTestRunner(unittest.TextTestRunner):
1120 A basic test runner implementation which prints results to standard error.
1123 def resultclass(self):
1124 """Class maintaining the results of the tests"""
1125 return VppTestResult
1127 def __init__(self, keep_alive_pipe=None, failed_pipe=None,
1128 stream=sys.stderr, descriptions=True,
1129 verbosity=1, failfast=False, buffer=False, resultclass=None):
1130 # ignore stream setting here, use hard-coded stdout to be in sync
1131 # with prints from VppTestCase methods ...
1132 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1133 verbosity, failfast, buffer,
1135 reporter = KeepAliveReporter()
1136 reporter.pipe = keep_alive_pipe
1137 # this is super-ugly, but very simple to implement and works as long
1138 # as we run only one test at the same time
1139 VppTestResult.test_framework_failed_pipe = failed_pipe
1141 test_option = "TEST"
1143 def parse_test_option(self):
1144 f = os.getenv(self.test_option, None)
1145 filter_file_name = None
1146 filter_class_name = None
1147 filter_func_name = None
1150 parts = f.split('.')
1152 raise Exception("Unrecognized %s option: %s" %
1153 (self.test_option, f))
1155 if parts[2] not in ('*', ''):
1156 filter_func_name = parts[2]
1157 if parts[1] not in ('*', ''):
1158 filter_class_name = parts[1]
1159 if parts[0] not in ('*', ''):
1160 if parts[0].startswith('test_'):
1161 filter_file_name = parts[0]
1163 filter_file_name = 'test_%s' % parts[0]
1165 if f.startswith('test_'):
1166 filter_file_name = f
1168 filter_file_name = 'test_%s' % f
1169 return filter_file_name, filter_class_name, filter_func_name
1172 def filter_tests(tests, filter_cb):
1173 result = unittest.suite.TestSuite()
1175 if isinstance(t, unittest.suite.TestSuite):
1176 # this is a bunch of tests, recursively filter...
1177 x = VppTestRunner.filter_tests(t, filter_cb)
1178 if x.countTestCases() > 0:
1180 elif isinstance(t, unittest.TestCase):
1181 # this is a single test
1182 parts = t.id().split('.')
1183 # t.id() for common cases like this:
1184 # test_classifier.TestClassifier.test_acl_ip
1185 # apply filtering only if it is so
1187 if not filter_cb(parts[0], parts[1], parts[2]):
1191 # unexpected object, don't touch it
1195 def run(self, test):
1202 faulthandler.enable() # emit stack trace to stderr if killed by signal
1203 print("Running tests using custom test runner") # debug message
1204 filter_file, filter_class, filter_func = self.parse_test_option()
1205 print("Active filters: file=%s, class=%s, function=%s" % (
1206 filter_file, filter_class, filter_func))
1207 filter_cb = Filter_by_test_option(
1208 filter_file, filter_class, filter_func)
1209 filtered = self.filter_tests(test, filter_cb)
1210 print("%s out of %s tests match specified filters" % (
1211 filtered.countTestCases(), test.countTestCases()))
1212 if not running_extended_tests():
1213 print("Not running extended tests (some tests will be skipped)")
1214 # super-ugly hack #2
1215 VppTestResult.test_suite = filtered
1216 return super(VppTestRunner, self).run(filtered)
1219 class Worker(Thread):
1220 def __init__(self, args, logger, env={}):
1221 self.logger = logger
1224 self.env = copy.deepcopy(env)
1225 super(Worker, self).__init__()
1228 executable = self.args[0]
1229 self.logger.debug("Running executable w/args `%s'" % self.args)
1230 env = os.environ.copy()
1231 env.update(self.env)
1232 env["CK_LOG_FILE_NAME"] = "-"
1233 self.process = subprocess.Popen(
1234 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1235 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1236 out, err = self.process.communicate()
1237 self.logger.debug("Finished running `%s'" % executable)
1238 self.logger.info("Return code is `%s'" % self.process.returncode)
1239 self.logger.info(single_line_delim)
1240 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1241 self.logger.info(single_line_delim)
1242 self.logger.info(out)
1243 self.logger.info(single_line_delim)
1244 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1245 self.logger.info(single_line_delim)
1246 self.logger.info(err)
1247 self.logger.info(single_line_delim)
1248 self.result = self.process.returncode