3 from __future__ import print_function
15 from collections import deque
16 from threading import Thread, Event
17 from inspect import getdoc, isclass
18 from traceback import format_exception
19 from logging import FileHandler, DEBUG, Formatter
20 from scapy.packet import Raw
21 from hook import StepHook, PollHook, VppDiedError
22 from vpp_pg_interface import VppPGInterface
23 from vpp_sub_interface import VppSubInterface
24 from vpp_lo_interface import VppLoInterface
25 from vpp_papi_provider import VppPapiProvider
26 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
28 from vpp_object import VppObjectRegistry
30 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
31 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
32 from scapy.layers.inet6 import ICMPv6EchoReply
33 if os.name == 'posix' and sys.version_info[0] < 3:
34 # using subprocess32 is recommended by python official documentation
35 # @ https://docs.python.org/2/library/subprocess.html
36 import subprocess32 as subprocess
41 debug_framework = False
42 if os.getenv('TEST_DEBUG', "0") == "1":
43 debug_framework = True
48 Test framework module.
50 The module provides a set of tools for constructing and running tests and
51 representing the results.
55 class _PacketInfo(object):
56 """Private class to create packet info object.
58 Help process information about the next packet.
59 Set variables to default values.
61 #: Store the index of the packet.
63 #: Store the index of the source packet generator interface of the packet.
65 #: Store the index of the destination packet generator interface
68 #: Store expected ip version
70 #: Store expected upper protocol
72 #: Store the copy of the former packet.
75 def __eq__(self, other):
76 index = self.index == other.index
77 src = self.src == other.src
78 dst = self.dst == other.dst
79 data = self.data == other.data
80 return index and src and dst and data
83 def pump_output(testclass):
84 """ pump output from vpp stdout/stderr to proper queues """
87 while not testclass.pump_thread_stop_flag.is_set():
88 readable = select.select([testclass.vpp.stdout.fileno(),
89 testclass.vpp.stderr.fileno(),
90 testclass.pump_thread_wakeup_pipe[0]],
92 if testclass.vpp.stdout.fileno() in readable:
93 read = os.read(testclass.vpp.stdout.fileno(), 102400)
95 split = read.splitlines(True)
96 if len(stdout_fragment) > 0:
97 split[0] = "%s%s" % (stdout_fragment, split[0])
98 if len(split) > 0 and split[-1].endswith("\n"):
102 stdout_fragment = split[-1]
103 testclass.vpp_stdout_deque.extend(split[:limit])
104 if not testclass.cache_vpp_output:
105 for line in split[:limit]:
106 testclass.logger.debug(
107 "VPP STDOUT: %s" % line.rstrip("\n"))
108 if testclass.vpp.stderr.fileno() in readable:
109 read = os.read(testclass.vpp.stderr.fileno(), 102400)
111 split = read.splitlines(True)
112 if len(stderr_fragment) > 0:
113 split[0] = "%s%s" % (stderr_fragment, split[0])
114 if len(split) > 0 and split[-1].endswith("\n"):
118 stderr_fragment = split[-1]
119 testclass.vpp_stderr_deque.extend(split[:limit])
120 if not testclass.cache_vpp_output:
121 for line in split[:limit]:
122 testclass.logger.debug(
123 "VPP STDERR: %s" % line.rstrip("\n"))
124 # ignoring the dummy pipe here intentionally - the flag will take care
125 # of properly terminating the loop
128 def running_extended_tests():
129 s = os.getenv("EXTENDED_TESTS", "n")
130 return True if s.lower() in ("y", "yes", "1") else False
133 def running_on_centos():
134 os_id = os.getenv("OS_ID", "")
135 return True if "centos" in os_id.lower() else False
138 class KeepAliveReporter(object):
140 Singleton object which reports test start to parent process
145 self.__dict__ = self._shared_state
152 def pipe(self, pipe):
153 if hasattr(self, '_pipe'):
154 raise Exception("Internal error - pipe should only be set once.")
157 def send_keep_alive(self, test):
159 Write current test tmpdir & desc to keep-alive pipe to signal liveness
161 if self.pipe is None:
162 # if not running forked..
168 desc = test.shortDescription()
172 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
175 class VppTestCase(unittest.TestCase):
176 """This subclass is a base class for VPP test cases that are implemented as
177 classes. It provides methods to create and run test case.
181 def packet_infos(self):
182 """List of packet infos"""
183 return self._packet_infos
186 def get_packet_count_for_if_idx(cls, dst_if_index):
187 """Get the number of packet info for specified destination if index"""
188 if dst_if_index in cls._packet_count_for_dst_if_idx:
189 return cls._packet_count_for_dst_if_idx[dst_if_index]
195 """Return the instance of this testcase"""
196 return cls.test_instance
199 def set_debug_flags(cls, d):
200 cls.debug_core = False
201 cls.debug_gdb = False
202 cls.debug_gdbserver = False
207 cls.debug_core = True
210 elif dl == "gdbserver":
211 cls.debug_gdbserver = True
213 raise Exception("Unrecognized DEBUG option: '%s'" % d)
216 def get_least_used_cpu(self):
217 cpu_usage_list = [set(range(psutil.cpu_count()))]
218 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
219 if 'vpp_main' == p.info['name']]
220 for vpp_process in vpp_processes:
221 for cpu_usage_set in cpu_usage_list:
223 cpu_num = vpp_process.cpu_num()
224 if cpu_num in cpu_usage_set:
225 cpu_usage_set_index = cpu_usage_list.index(
227 if cpu_usage_set_index == len(cpu_usage_list) - 1:
228 cpu_usage_list.append({cpu_num})
230 cpu_usage_list[cpu_usage_set_index + 1].add(
232 cpu_usage_set.remove(cpu_num)
234 except psutil.NoSuchProcess:
237 for cpu_usage_set in cpu_usage_list:
238 if len(cpu_usage_set) > 0:
239 min_usage_set = cpu_usage_set
242 return random.choice(tuple(min_usage_set))
245 def setUpConstants(cls):
246 """ Set-up the test case class based on environment variables """
247 s = os.getenv("STEP", "n")
248 cls.step = True if s.lower() in ("y", "yes", "1") else False
249 d = os.getenv("DEBUG", None)
250 c = os.getenv("CACHE_OUTPUT", "1")
251 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
252 cls.set_debug_flags(d)
253 cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
254 cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
255 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
257 if cls.plugin_path is not None:
258 if cls.extern_plugin_path is not None:
259 plugin_path = "%s:%s" % (
260 cls.plugin_path, cls.extern_plugin_path)
262 plugin_path = cls.plugin_path
263 elif cls.extern_plugin_path is not None:
264 plugin_path = cls.extern_plugin_path
266 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
267 debug_cli = "cli-listen localhost:5002"
269 size = os.getenv("COREDUMP_SIZE")
271 coredump_size = "coredump-size %s" % size
272 if coredump_size is None:
273 coredump_size = "coredump-size unlimited"
275 cpu_core_number = cls.get_least_used_cpu()
277 cls.vpp_cmdline = [cls.vpp_bin, "unix",
278 "{", "nodaemon", debug_cli, "full-coredump",
279 coredump_size, "}", "api-trace", "{", "on", "}",
280 "api-segment", "{", "prefix", cls.shm_prefix, "}",
281 "cpu", "{", "main-core", str(cpu_core_number), "}",
282 "plugins", "{", "plugin", "dpdk_plugin.so", "{",
283 "disable", "}", "plugin", "unittest_plugin.so",
284 "{", "enable", "}", "}", ]
285 if plugin_path is not None:
286 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
287 cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
290 def wait_for_enter(cls):
291 if cls.debug_gdbserver:
292 print(double_line_delim)
293 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
295 print(double_line_delim)
296 print("Spawned VPP with PID: %d" % cls.vpp.pid)
298 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
300 print(single_line_delim)
301 print("You can debug the VPP using e.g.:")
302 if cls.debug_gdbserver:
303 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
304 print("Now is the time to attach a gdb by running the above "
305 "command, set up breakpoints etc. and then resume VPP from "
306 "within gdb by issuing the 'continue' command")
308 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
309 print("Now is the time to attach a gdb by running the above "
310 "command and set up breakpoints etc.")
311 print(single_line_delim)
312 raw_input("Press ENTER to continue running the testcase...")
316 cmdline = cls.vpp_cmdline
318 if cls.debug_gdbserver:
319 gdbserver = '/usr/bin/gdbserver'
320 if not os.path.isfile(gdbserver) or \
321 not os.access(gdbserver, os.X_OK):
322 raise Exception("gdbserver binary '%s' does not exist or is "
323 "not executable" % gdbserver)
325 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
326 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
329 cls.vpp = subprocess.Popen(cmdline,
330 stdout=subprocess.PIPE,
331 stderr=subprocess.PIPE,
333 except Exception as e:
334 cls.logger.critical("Couldn't start vpp: %s" % e)
342 Perform class setup before running the testcase
343 Remove shared memory files, start vpp and connect the vpp-api
345 gc.collect() # run garbage collection first
347 if not hasattr(cls, 'logger'):
348 cls.logger = getLogger(cls.__name__)
350 cls.logger.name = cls.__name__
351 cls.tempdir = tempfile.mkdtemp(
352 prefix='vpp-unittest-%s-' % cls.__name__)
353 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
354 cls.file_handler.setFormatter(
355 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
357 cls.file_handler.setLevel(DEBUG)
358 cls.logger.addHandler(cls.file_handler)
359 cls.shm_prefix = os.path.basename(cls.tempdir)
360 os.chdir(cls.tempdir)
361 cls.logger.info("Temporary dir is %s, shm prefix is %s",
362 cls.tempdir, cls.shm_prefix)
364 cls.reset_packet_infos()
366 cls._zombie_captures = []
369 cls.registry = VppObjectRegistry()
370 cls.vpp_startup_failed = False
371 cls.reporter = KeepAliveReporter()
372 # need to catch exceptions here because if we raise, then the cleanup
373 # doesn't get called and we might end with a zombie vpp
376 cls.reporter.send_keep_alive(cls)
377 cls.vpp_stdout_deque = deque()
378 cls.vpp_stderr_deque = deque()
379 cls.pump_thread_stop_flag = Event()
380 cls.pump_thread_wakeup_pipe = os.pipe()
381 cls.pump_thread = Thread(target=pump_output, args=(cls,))
382 cls.pump_thread.daemon = True
383 cls.pump_thread.start()
384 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls)
389 cls.vapi.register_hook(hook)
390 cls.sleep(0.1, "after vpp startup, before initial poll")
394 cls.vpp_startup_failed = True
396 "VPP died shortly after startup, check the"
397 " output to standard error for possible cause")
403 cls.vapi.disconnect()
406 if cls.debug_gdbserver:
407 print(colorize("You're running VPP inside gdbserver but "
408 "VPP-API connection failed, did you forget "
409 "to 'continue' VPP from within gdb?", RED))
421 Disconnect vpp-api, kill vpp and cleanup shared memory files
423 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
425 if cls.vpp.returncode is None:
426 print(double_line_delim)
427 print("VPP or GDB server is still running")
428 print(single_line_delim)
429 raw_input("When done debugging, press ENTER to kill the "
430 "process and finish running the testcase...")
432 # first signal that we want to stop the pump thread, then wake it up
433 if hasattr(cls, 'pump_thread_stop_flag'):
434 cls.pump_thread_stop_flag.set()
435 if hasattr(cls, 'pump_thread_wakeup_pipe'):
436 os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
437 if hasattr(cls, 'pump_thread'):
438 cls.logger.debug("Waiting for pump thread to stop")
439 cls.pump_thread.join()
440 if hasattr(cls, 'vpp_stderr_reader_thread'):
441 cls.logger.debug("Waiting for stdderr pump to stop")
442 cls.vpp_stderr_reader_thread.join()
444 if hasattr(cls, 'vpp'):
445 if hasattr(cls, 'vapi'):
446 cls.vapi.disconnect()
449 if cls.vpp.returncode is None:
450 cls.logger.debug("Sending TERM to vpp")
452 cls.logger.debug("Waiting for vpp to die")
453 cls.vpp.communicate()
456 if cls.vpp_startup_failed:
457 stdout_log = cls.logger.info
458 stderr_log = cls.logger.critical
460 stdout_log = cls.logger.info
461 stderr_log = cls.logger.info
463 if hasattr(cls, 'vpp_stdout_deque'):
464 stdout_log(single_line_delim)
465 stdout_log('VPP output to stdout while running %s:', cls.__name__)
466 stdout_log(single_line_delim)
467 vpp_output = "".join(cls.vpp_stdout_deque)
468 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
470 stdout_log('\n%s', vpp_output)
471 stdout_log(single_line_delim)
473 if hasattr(cls, 'vpp_stderr_deque'):
474 stderr_log(single_line_delim)
475 stderr_log('VPP output to stderr while running %s:', cls.__name__)
476 stderr_log(single_line_delim)
477 vpp_output = "".join(cls.vpp_stderr_deque)
478 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
480 stderr_log('\n%s', vpp_output)
481 stderr_log(single_line_delim)
484 def tearDownClass(cls):
485 """ Perform final cleanup after running all tests in this test-case """
487 cls.file_handler.close()
488 cls.reset_packet_infos()
490 debug_internal.on_tear_down_class(cls)
493 """ Show various debug prints after each test """
494 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
495 (self.__class__.__name__, self._testMethodName,
496 self._testMethodDoc))
497 if not self.vpp_dead:
498 self.logger.debug(self.vapi.cli("show trace"))
499 self.logger.info(self.vapi.ppcli("show interface"))
500 self.logger.info(self.vapi.ppcli("show hardware"))
501 self.logger.info(self.vapi.ppcli("show error"))
502 self.logger.info(self.vapi.ppcli("show run"))
503 self.logger.info(self.vapi.ppcli("show log"))
504 self.registry.remove_vpp_config(self.logger)
505 # Save/Dump VPP api trace log
506 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
507 tmp_api_trace = "/tmp/%s" % api_trace
508 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
509 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
510 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
512 os.rename(tmp_api_trace, vpp_api_trace_log)
513 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
516 self.registry.unregister_all(self.logger)
519 """ Clear trace before running each test"""
520 self.reporter.send_keep_alive(self)
521 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
522 (self.__class__.__name__, self._testMethodName,
523 self._testMethodDoc))
525 raise Exception("VPP is dead when setting up the test")
526 self.sleep(.1, "during setUp")
527 self.vpp_stdout_deque.append(
528 "--- test setUp() for %s.%s(%s) starts here ---\n" %
529 (self.__class__.__name__, self._testMethodName,
530 self._testMethodDoc))
531 self.vpp_stderr_deque.append(
532 "--- test setUp() for %s.%s(%s) starts here ---\n" %
533 (self.__class__.__name__, self._testMethodName,
534 self._testMethodDoc))
535 self.vapi.cli("clear trace")
536 # store the test instance inside the test class - so that objects
537 # holding the class can access instance methods (like assertEqual)
538 type(self).test_instance = self
541 def pg_enable_capture(cls, interfaces=None):
543 Enable capture on packet-generator interfaces
545 :param interfaces: iterable interface indexes (if None,
546 use self.pg_interfaces)
549 if interfaces is None:
550 interfaces = cls.pg_interfaces
555 def register_capture(cls, cap_name):
556 """ Register a capture in the testclass """
557 # add to the list of captures with current timestamp
558 cls._captures.append((time.time(), cap_name))
559 # filter out from zombies
560 cls._zombie_captures = [(stamp, name)
561 for (stamp, name) in cls._zombie_captures
566 """ Remove any zombie captures and enable the packet generator """
567 # how long before capture is allowed to be deleted - otherwise vpp
568 # crashes - 100ms seems enough (this shouldn't be needed at all)
571 for stamp, cap_name in cls._zombie_captures:
572 wait = stamp + capture_ttl - now
574 cls.sleep(wait, "before deleting capture %s" % cap_name)
576 cls.logger.debug("Removing zombie capture %s" % cap_name)
577 cls.vapi.cli('packet-generator delete %s' % cap_name)
579 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
580 cls.vapi.cli('packet-generator enable')
581 cls._zombie_captures = cls._captures
585 def create_pg_interfaces(cls, interfaces):
587 Create packet-generator interfaces.
589 :param interfaces: iterable indexes of the interfaces.
590 :returns: List of created interfaces.
595 intf = VppPGInterface(cls, i)
596 setattr(cls, intf.name, intf)
598 cls.pg_interfaces = result
602 def create_loopback_interfaces(cls, count):
604 Create loopback interfaces.
606 :param count: number of interfaces created.
607 :returns: List of created interfaces.
609 result = [VppLoInterface(cls) for i in range(count)]
611 setattr(cls, intf.name, intf)
612 cls.lo_interfaces = result
616 def extend_packet(packet, size, padding=' '):
618 Extend packet to given size by padding with spaces or custom padding
619 NOTE: Currently works only when Raw layer is present.
621 :param packet: packet
622 :param size: target size
623 :param padding: padding used to extend the payload
626 packet_len = len(packet) + 4
627 extend = size - packet_len
629 num = (extend / len(padding)) + 1
630 packet[Raw].load += (padding * num)[:extend]
633 def reset_packet_infos(cls):
634 """ Reset the list of packet info objects and packet counts to zero """
635 cls._packet_infos = {}
636 cls._packet_count_for_dst_if_idx = {}
639 def create_packet_info(cls, src_if, dst_if):
641 Create packet info object containing the source and destination indexes
642 and add it to the testcase's packet info list
644 :param VppInterface src_if: source interface
645 :param VppInterface dst_if: destination interface
647 :returns: _PacketInfo object
651 info.index = len(cls._packet_infos)
652 info.src = src_if.sw_if_index
653 info.dst = dst_if.sw_if_index
654 if isinstance(dst_if, VppSubInterface):
655 dst_idx = dst_if.parent.sw_if_index
657 dst_idx = dst_if.sw_if_index
658 if dst_idx in cls._packet_count_for_dst_if_idx:
659 cls._packet_count_for_dst_if_idx[dst_idx] += 1
661 cls._packet_count_for_dst_if_idx[dst_idx] = 1
662 cls._packet_infos[info.index] = info
666 def info_to_payload(info):
668 Convert _PacketInfo object to packet payload
670 :param info: _PacketInfo object
672 :returns: string containing serialized data from packet info
674 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
678 def payload_to_info(payload):
680 Convert packet payload to _PacketInfo object
682 :param payload: packet payload
684 :returns: _PacketInfo object containing de-serialized data from payload
687 numbers = payload.split()
689 info.index = int(numbers[0])
690 info.src = int(numbers[1])
691 info.dst = int(numbers[2])
692 info.ip = int(numbers[3])
693 info.proto = int(numbers[4])
696 def get_next_packet_info(self, info):
698 Iterate over the packet info list stored in the testcase
699 Start iteration with first element if info is None
700 Continue based on index in info if info is specified
702 :param info: info or None
703 :returns: next info in list or None if no more infos
708 next_index = info.index + 1
709 if next_index == len(self._packet_infos):
712 return self._packet_infos[next_index]
714 def get_next_packet_info_for_interface(self, src_index, info):
716 Search the packet info list for the next packet info with same source
719 :param src_index: source interface index to search for
720 :param info: packet info - where to start the search
721 :returns: packet info or None
725 info = self.get_next_packet_info(info)
728 if info.src == src_index:
731 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
733 Search the packet info list for the next packet info with same source
734 and destination interface indexes
736 :param src_index: source interface index to search for
737 :param dst_index: destination interface index to search for
738 :param info: packet info - where to start the search
739 :returns: packet info or None
743 info = self.get_next_packet_info_for_interface(src_index, info)
746 if info.dst == dst_index:
749 def assert_equal(self, real_value, expected_value, name_or_class=None):
750 if name_or_class is None:
751 self.assertEqual(real_value, expected_value)
754 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
755 msg = msg % (getdoc(name_or_class).strip(),
756 real_value, str(name_or_class(real_value)),
757 expected_value, str(name_or_class(expected_value)))
759 msg = "Invalid %s: %s does not match expected value %s" % (
760 name_or_class, real_value, expected_value)
762 self.assertEqual(real_value, expected_value, msg)
764 def assert_in_range(self,
772 msg = "Invalid %s: %s out of range <%s,%s>" % (
773 name, real_value, expected_min, expected_max)
774 self.assertTrue(expected_min <= real_value <= expected_max, msg)
776 def assert_packet_checksums_valid(self, packet,
777 ignore_zero_udp_checksums=True):
778 received = packet.__class__(str(packet))
780 ppp("Verifying packet checksums for packet:", received))
781 udp_layers = ['UDP', 'UDPerror']
782 checksum_fields = ['cksum', 'chksum']
785 temp = received.__class__(str(received))
787 layer = temp.getlayer(counter)
789 for cf in checksum_fields:
790 if hasattr(layer, cf):
791 if ignore_zero_udp_checksums and \
792 0 == getattr(layer, cf) and \
793 layer.name in udp_layers:
796 checksums.append((counter, cf))
799 counter = counter + 1
800 if 0 == len(checksums):
802 temp = temp.__class__(str(temp))
803 for layer, cf in checksums:
804 calc_sum = getattr(temp[layer], cf)
806 getattr(received[layer], cf), calc_sum,
807 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
809 "Checksum field `%s` on `%s` layer has correct value `%s`" %
810 (cf, temp[layer].name, calc_sum))
812 def assert_checksum_valid(self, received_packet, layer,
814 ignore_zero_checksum=False):
815 """ Check checksum of received packet on given layer """
816 received_packet_checksum = getattr(received_packet[layer], field_name)
817 if ignore_zero_checksum and 0 == received_packet_checksum:
819 recalculated = received_packet.__class__(str(received_packet))
820 delattr(recalculated[layer], field_name)
821 recalculated = recalculated.__class__(str(recalculated))
822 self.assert_equal(received_packet_checksum,
823 getattr(recalculated[layer], field_name),
824 "packet checksum on layer: %s" % layer)
826 def assert_ip_checksum_valid(self, received_packet,
827 ignore_zero_checksum=False):
828 self.assert_checksum_valid(received_packet, 'IP',
829 ignore_zero_checksum=ignore_zero_checksum)
831 def assert_tcp_checksum_valid(self, received_packet,
832 ignore_zero_checksum=False):
833 self.assert_checksum_valid(received_packet, 'TCP',
834 ignore_zero_checksum=ignore_zero_checksum)
836 def assert_udp_checksum_valid(self, received_packet,
837 ignore_zero_checksum=True):
838 self.assert_checksum_valid(received_packet, 'UDP',
839 ignore_zero_checksum=ignore_zero_checksum)
841 def assert_embedded_icmp_checksum_valid(self, received_packet):
842 if received_packet.haslayer(IPerror):
843 self.assert_checksum_valid(received_packet, 'IPerror')
844 if received_packet.haslayer(TCPerror):
845 self.assert_checksum_valid(received_packet, 'TCPerror')
846 if received_packet.haslayer(UDPerror):
847 self.assert_checksum_valid(received_packet, 'UDPerror',
848 ignore_zero_checksum=True)
849 if received_packet.haslayer(ICMPerror):
850 self.assert_checksum_valid(received_packet, 'ICMPerror')
852 def assert_icmp_checksum_valid(self, received_packet):
853 self.assert_checksum_valid(received_packet, 'ICMP')
854 self.assert_embedded_icmp_checksum_valid(received_packet)
856 def assert_icmpv6_checksum_valid(self, pkt):
857 if pkt.haslayer(ICMPv6DestUnreach):
858 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
859 self.assert_embedded_icmp_checksum_valid(pkt)
860 if pkt.haslayer(ICMPv6EchoRequest):
861 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
862 if pkt.haslayer(ICMPv6EchoReply):
863 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
866 def sleep(cls, timeout, remark=None):
867 if hasattr(cls, 'logger'):
868 cls.logger.debug("Starting sleep for %ss (%s)" % (timeout, remark))
872 if after - before > 2 * timeout:
873 cls.logger.error("unexpected time.sleep() result - "
874 "slept for %ss instead of ~%ss!" % (
875 after - before, timeout))
876 if hasattr(cls, 'logger'):
878 "Finished sleep (%s) - slept %ss (wanted %ss)" % (
879 remark, after - before, timeout))
881 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
882 self.vapi.cli("clear trace")
883 intf.add_stream(pkts)
884 self.pg_enable_capture(self.pg_interfaces)
888 for i in self.pg_interfaces:
889 i.get_capture(0, timeout=timeout)
890 i.assert_nothing_captured(remark=remark)
893 def send_and_expect(self, input, pkts, output):
894 self.vapi.cli("clear trace")
895 input.add_stream(pkts)
896 self.pg_enable_capture(self.pg_interfaces)
898 rx = output.get_capture(len(pkts))
902 def get_testcase_doc_name(test):
903 return getdoc(test.__class__).splitlines()[0]
906 def get_test_description(descriptions, test):
907 # TODO: if none print warning not raise exception
908 short_description = test.shortDescription()
909 if descriptions and short_description:
910 return short_description
915 class TestCasePrinter(object):
919 self.__dict__ = self._shared_state
920 if not hasattr(self, "_test_case_set"):
921 self._test_case_set = set()
923 def print_test_case_heading_if_first_time(self, case):
924 if case.__class__ not in self._test_case_set:
925 print(double_line_delim)
926 print(colorize(get_testcase_doc_name(case), GREEN))
927 print(double_line_delim)
928 self._test_case_set.add(case.__class__)
931 class VppTestResult(unittest.TestResult):
933 @property result_string
934 String variable to store the test case result string.
936 List variable containing 2-tuples of TestCase instances and strings
937 holding formatted tracebacks. Each tuple represents a test which
938 raised an unexpected exception.
940 List variable containing 2-tuples of TestCase instances and strings
941 holding formatted tracebacks. Each tuple represents a test where
942 a failure was explicitly signalled using the TestCase.assert*()
946 def __init__(self, stream, descriptions, verbosity):
948 :param stream File descriptor to store where to report test results.
949 Set to the standard error stream by default.
950 :param descriptions Boolean variable to store information if to use
951 test case descriptions.
952 :param verbosity Integer variable to store required verbosity level.
954 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
956 self.descriptions = descriptions
957 self.verbosity = verbosity
958 self.result_string = None
959 self.printer = TestCasePrinter()
962 def addSuccess(self, test):
964 Record a test succeeded result
969 if hasattr(test, 'logger'):
970 test.logger.debug("--- addSuccess() %s.%s(%s) called"
971 % (test.__class__.__name__,
972 test._testMethodName,
973 test._testMethodDoc))
975 unittest.TestResult.addSuccess(self, test)
976 self.result_string = colorize("OK", GREEN)
978 def addSkip(self, test, reason):
980 Record a test skipped.
986 if hasattr(test, 'logger'):
987 test.logger.debug("--- addSkip() %s.%s(%s) called, reason is %s"
988 % (test.__class__.__name__,
989 test._testMethodName,
992 unittest.TestResult.addSkip(self, test, reason)
993 self.result_string = colorize("SKIP", YELLOW)
995 def symlink_failed(self, test):
997 if hasattr(test, 'logger'):
999 if hasattr(test, 'tempdir'):
1001 failed_dir = os.getenv('VPP_TEST_FAILED_DIR')
1002 link_path = os.path.join(failed_dir, '%s-FAILED' %
1003 os.path.basename(test.tempdir))
1005 logger.debug("creating a link to the failed test")
1006 logger.debug("os.symlink(%s, %s)" %
1007 (test.tempdir, link_path))
1008 if os.path.exists(link_path):
1010 logger.debug('symlink already exists')
1012 os.symlink(test.tempdir, link_path)
1014 except Exception as e:
1018 def addFailure(self, test, err):
1020 Record a test failed result
1023 :param err: error message
1026 if hasattr(test, 'logger'):
1027 test.logger.debug("--- addFailure() %s.%s(%s) called, err is %s"
1028 % (test.__class__.__name__,
1029 test._testMethodName,
1030 test._testMethodDoc, err))
1031 test.logger.debug("formatted exception is:\n%s" %
1032 "".join(format_exception(*err)))
1033 unittest.TestResult.addFailure(self, test, err)
1034 if hasattr(test, 'tempdir'):
1035 self.result_string = colorize("FAIL", RED) + \
1036 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
1037 self.symlink_failed(test)
1039 self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
1041 def addError(self, test, err):
1043 Record a test error result
1046 :param err: error message
1049 if hasattr(test, 'logger'):
1050 test.logger.debug("--- addError() %s.%s(%s) called, err is %s"
1051 % (test.__class__.__name__,
1052 test._testMethodName,
1053 test._testMethodDoc, err))
1054 test.logger.debug("formatted exception is:\n%s" %
1055 "".join(format_exception(*err)))
1056 unittest.TestResult.addError(self, test, err)
1057 if hasattr(test, 'tempdir'):
1058 self.result_string = colorize("ERROR", RED) + \
1059 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
1060 self.symlink_failed(test)
1062 self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
1064 def getDescription(self, test):
1066 Get test description
1069 :returns: test description
1072 return get_test_description(self.descriptions, test)
1074 def startTest(self, test):
1081 self.printer.print_test_case_heading_if_first_time(test)
1082 unittest.TestResult.startTest(self, test)
1083 if self.verbosity > 0:
1084 self.stream.writeln(
1085 "Starting " + self.getDescription(test) + " ...")
1086 self.stream.writeln(single_line_delim)
1088 def stopTest(self, test):
1095 unittest.TestResult.stopTest(self, test)
1096 if self.verbosity > 0:
1097 self.stream.writeln(single_line_delim)
1098 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1099 self.result_string))
1100 self.stream.writeln(single_line_delim)
1102 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1103 self.result_string))
1105 def printErrors(self):
1107 Print errors from running the test case
1109 self.stream.writeln()
1110 self.printErrorList('ERROR', self.errors)
1111 self.printErrorList('FAIL', self.failures)
1113 def printErrorList(self, flavour, errors):
1115 Print error list to the output stream together with error type
1116 and test case description.
1118 :param flavour: error type
1119 :param errors: iterable errors
1122 for test, err in errors:
1123 self.stream.writeln(double_line_delim)
1124 self.stream.writeln("%s: %s" %
1125 (flavour, self.getDescription(test)))
1126 self.stream.writeln(single_line_delim)
1127 self.stream.writeln("%s" % err)
1130 class VppTestRunner(unittest.TextTestRunner):
1132 A basic test runner implementation which prints results to standard error.
1135 def resultclass(self):
1136 """Class maintaining the results of the tests"""
1137 return VppTestResult
1139 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1140 failfast=False, buffer=False, resultclass=None):
1141 # ignore stream setting here, use hard-coded stdout to be in sync
1142 # with prints from VppTestCase methods ...
1143 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1144 verbosity, failfast, buffer,
1146 reporter = KeepAliveReporter()
1147 reporter.pipe = keep_alive_pipe
1149 def run(self, test):
1156 faulthandler.enable() # emit stack trace to stderr if killed by signal
1158 result = super(VppTestRunner, self).run(test)
1162 class Worker(Thread):
1163 def __init__(self, args, logger, env={}):
1164 self.logger = logger
1167 self.env = copy.deepcopy(env)
1168 super(Worker, self).__init__()
1171 executable = self.args[0]
1172 self.logger.debug("Running executable w/args `%s'" % self.args)
1173 env = os.environ.copy()
1174 env.update(self.env)
1175 env["CK_LOG_FILE_NAME"] = "-"
1176 self.process = subprocess.Popen(
1177 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1178 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1179 out, err = self.process.communicate()
1180 self.logger.debug("Finished running `%s'" % executable)
1181 self.logger.info("Return code is `%s'" % self.process.returncode)
1182 self.logger.info(single_line_delim)
1183 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1184 self.logger.info(single_line_delim)
1185 self.logger.info(out)
1186 self.logger.info(single_line_delim)
1187 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1188 self.logger.info(single_line_delim)
1189 self.logger.info(err)
1190 self.logger.info(single_line_delim)
1191 self.result = self.process.returncode