3 from __future__ import print_function
15 from collections import deque
16 from threading import Thread, Event
17 from inspect import getdoc, isclass
18 from traceback import format_exception
19 from logging import FileHandler, DEBUG, Formatter
20 from scapy.packet import Raw
21 from hook import StepHook, PollHook, VppDiedError
22 from vpp_pg_interface import VppPGInterface
23 from vpp_sub_interface import VppSubInterface
24 from vpp_lo_interface import VppLoInterface
25 from vpp_papi_provider import VppPapiProvider
26 from vpp_papi.vpp_stats import VPPStats
27 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
29 from vpp_object import VppObjectRegistry
31 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
32 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
33 from scapy.layers.inet6 import ICMPv6EchoReply
34 if os.name == 'posix' and sys.version_info[0] < 3:
35 # using subprocess32 is recommended by python official documentation
36 # @ https://docs.python.org/2/library/subprocess.html
37 import subprocess32 as subprocess
49 debug_framework = False
50 if os.getenv('TEST_DEBUG', "0") == "1":
51 debug_framework = True
56 Test framework module.
58 The module provides a set of tools for constructing and running tests and
59 representing the results.
63 class _PacketInfo(object):
64 """Private class to create packet info object.
66 Help process information about the next packet.
67 Set variables to default values.
69 #: Store the index of the packet.
71 #: Store the index of the source packet generator interface of the packet.
73 #: Store the index of the destination packet generator interface
76 #: Store expected ip version
78 #: Store expected upper protocol
80 #: Store the copy of the former packet.
83 def __eq__(self, other):
84 index = self.index == other.index
85 src = self.src == other.src
86 dst = self.dst == other.dst
87 data = self.data == other.data
88 return index and src and dst and data
91 def pump_output(testclass):
92 """ pump output from vpp stdout/stderr to proper queues """
95 while not testclass.pump_thread_stop_flag.is_set():
96 readable = select.select([testclass.vpp.stdout.fileno(),
97 testclass.vpp.stderr.fileno(),
98 testclass.pump_thread_wakeup_pipe[0]],
100 if testclass.vpp.stdout.fileno() in readable:
101 read = os.read(testclass.vpp.stdout.fileno(), 102400)
103 split = read.splitlines(True)
104 if len(stdout_fragment) > 0:
105 split[0] = "%s%s" % (stdout_fragment, split[0])
106 if len(split) > 0 and split[-1].endswith("\n"):
110 stdout_fragment = split[-1]
111 testclass.vpp_stdout_deque.extend(split[:limit])
112 if not testclass.cache_vpp_output:
113 for line in split[:limit]:
114 testclass.logger.debug(
115 "VPP STDOUT: %s" % line.rstrip("\n"))
116 if testclass.vpp.stderr.fileno() in readable:
117 read = os.read(testclass.vpp.stderr.fileno(), 102400)
119 split = read.splitlines(True)
120 if len(stderr_fragment) > 0:
121 split[0] = "%s%s" % (stderr_fragment, split[0])
122 if len(split) > 0 and split[-1].endswith("\n"):
126 stderr_fragment = split[-1]
127 testclass.vpp_stderr_deque.extend(split[:limit])
128 if not testclass.cache_vpp_output:
129 for line in split[:limit]:
130 testclass.logger.debug(
131 "VPP STDERR: %s" % line.rstrip("\n"))
132 # ignoring the dummy pipe here intentionally - the flag will take care
133 # of properly terminating the loop
136 def running_extended_tests():
137 s = os.getenv("EXTENDED_TESTS", "n")
138 return True if s.lower() in ("y", "yes", "1") else False
141 def running_on_centos():
142 os_id = os.getenv("OS_ID", "")
143 return True if "centos" in os_id.lower() else False
146 class KeepAliveReporter(object):
148 Singleton object which reports test start to parent process
153 self.__dict__ = self._shared_state
160 def pipe(self, pipe):
161 if hasattr(self, '_pipe'):
162 raise Exception("Internal error - pipe should only be set once.")
165 def send_keep_alive(self, test):
167 Write current test tmpdir & desc to keep-alive pipe to signal liveness
169 if self.pipe is None:
170 # if not running forked..
176 desc = test.shortDescription()
180 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
183 class VppTestCase(unittest.TestCase):
184 """This subclass is a base class for VPP test cases that are implemented as
185 classes. It provides methods to create and run test case.
189 def packet_infos(self):
190 """List of packet infos"""
191 return self._packet_infos
194 def get_packet_count_for_if_idx(cls, dst_if_index):
195 """Get the number of packet info for specified destination if index"""
196 if dst_if_index in cls._packet_count_for_dst_if_idx:
197 return cls._packet_count_for_dst_if_idx[dst_if_index]
203 """Return the instance of this testcase"""
204 return cls.test_instance
207 def set_debug_flags(cls, d):
208 cls.debug_core = False
209 cls.debug_gdb = False
210 cls.debug_gdbserver = False
215 cls.debug_core = True
218 elif dl == "gdbserver":
219 cls.debug_gdbserver = True
221 raise Exception("Unrecognized DEBUG option: '%s'" % d)
224 def get_least_used_cpu(self):
225 cpu_usage_list = [set(range(psutil.cpu_count()))]
226 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
227 if 'vpp_main' == p.info['name']]
228 for vpp_process in vpp_processes:
229 for cpu_usage_set in cpu_usage_list:
231 cpu_num = vpp_process.cpu_num()
232 if cpu_num in cpu_usage_set:
233 cpu_usage_set_index = cpu_usage_list.index(
235 if cpu_usage_set_index == len(cpu_usage_list) - 1:
236 cpu_usage_list.append({cpu_num})
238 cpu_usage_list[cpu_usage_set_index + 1].add(
240 cpu_usage_set.remove(cpu_num)
242 except psutil.NoSuchProcess:
245 for cpu_usage_set in cpu_usage_list:
246 if len(cpu_usage_set) > 0:
247 min_usage_set = cpu_usage_set
250 return random.choice(tuple(min_usage_set))
253 def setUpConstants(cls):
254 """ Set-up the test case class based on environment variables """
255 s = os.getenv("STEP", "n")
256 cls.step = True if s.lower() in ("y", "yes", "1") else False
257 d = os.getenv("DEBUG", None)
258 c = os.getenv("CACHE_OUTPUT", "1")
259 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
260 cls.set_debug_flags(d)
261 cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
262 cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
263 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
265 if cls.plugin_path is not None:
266 if cls.extern_plugin_path is not None:
267 plugin_path = "%s:%s" % (
268 cls.plugin_path, cls.extern_plugin_path)
270 plugin_path = cls.plugin_path
271 elif cls.extern_plugin_path is not None:
272 plugin_path = cls.extern_plugin_path
274 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
275 debug_cli = "cli-listen localhost:5002"
277 size = os.getenv("COREDUMP_SIZE")
279 coredump_size = "coredump-size %s" % size
280 if coredump_size is None:
281 coredump_size = "coredump-size unlimited"
283 cpu_core_number = cls.get_least_used_cpu()
285 cls.vpp_cmdline = [cls.vpp_bin, "unix",
286 "{", "nodaemon", debug_cli, "full-coredump",
287 coredump_size, "runtime-dir", cls.tempdir, "}",
288 "api-trace", "{", "on", "}", "api-segment", "{",
289 "prefix", cls.shm_prefix, "}", "cpu", "{",
290 "main-core", str(cpu_core_number), "}", "statseg",
291 "{", "socket-name", cls.stats_sock, "}", "plugins",
292 "{", "plugin", "dpdk_plugin.so", "{", "disable",
293 "}", "plugin", "unittest_plugin.so", "{", "enable",
295 if plugin_path is not None:
296 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
297 cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
300 def wait_for_enter(cls):
301 if cls.debug_gdbserver:
302 print(double_line_delim)
303 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
305 print(double_line_delim)
306 print("Spawned VPP with PID: %d" % cls.vpp.pid)
308 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
310 print(single_line_delim)
311 print("You can debug the VPP using e.g.:")
312 if cls.debug_gdbserver:
313 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
314 print("Now is the time to attach a gdb by running the above "
315 "command, set up breakpoints etc. and then resume VPP from "
316 "within gdb by issuing the 'continue' command")
318 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
319 print("Now is the time to attach a gdb by running the above "
320 "command and set up breakpoints etc.")
321 print(single_line_delim)
322 raw_input("Press ENTER to continue running the testcase...")
326 cmdline = cls.vpp_cmdline
328 if cls.debug_gdbserver:
329 gdbserver = '/usr/bin/gdbserver'
330 if not os.path.isfile(gdbserver) or \
331 not os.access(gdbserver, os.X_OK):
332 raise Exception("gdbserver binary '%s' does not exist or is "
333 "not executable" % gdbserver)
335 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
336 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
339 cls.vpp = subprocess.Popen(cmdline,
340 stdout=subprocess.PIPE,
341 stderr=subprocess.PIPE,
343 except Exception as e:
344 cls.logger.critical("Couldn't start vpp: %s" % e)
350 def wait_for_stats_socket(cls):
351 deadline = time.time() + 3
352 while time.time() < deadline or cls.debug_gdb or cls.debug_gdbserver:
353 if os.path.exists(cls.stats_sock):
359 Perform class setup before running the testcase
360 Remove shared memory files, start vpp and connect the vpp-api
362 gc.collect() # run garbage collection first
364 if not hasattr(cls, 'logger'):
365 cls.logger = getLogger(cls.__name__)
367 cls.logger.name = cls.__name__
368 cls.tempdir = tempfile.mkdtemp(
369 prefix='vpp-unittest-%s-' % cls.__name__)
370 cls.stats_sock = "%s/stats.sock" % cls.tempdir
371 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
372 cls.file_handler.setFormatter(
373 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
375 cls.file_handler.setLevel(DEBUG)
376 cls.logger.addHandler(cls.file_handler)
377 cls.shm_prefix = os.path.basename(cls.tempdir)
378 os.chdir(cls.tempdir)
379 cls.logger.info("Temporary dir is %s, shm prefix is %s",
380 cls.tempdir, cls.shm_prefix)
382 cls.reset_packet_infos()
384 cls._zombie_captures = []
387 cls.registry = VppObjectRegistry()
388 cls.vpp_startup_failed = False
389 cls.reporter = KeepAliveReporter()
390 # need to catch exceptions here because if we raise, then the cleanup
391 # doesn't get called and we might end with a zombie vpp
394 cls.reporter.send_keep_alive(cls)
395 cls.vpp_stdout_deque = deque()
396 cls.vpp_stderr_deque = deque()
397 cls.pump_thread_stop_flag = Event()
398 cls.pump_thread_wakeup_pipe = os.pipe()
399 cls.pump_thread = Thread(target=pump_output, args=(cls,))
400 cls.pump_thread.daemon = True
401 cls.pump_thread.start()
402 if cls.debug_gdb or cls.debug_gdbserver:
406 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
412 cls.vapi.register_hook(hook)
413 cls.wait_for_stats_socket()
414 cls.statistics = VPPStats(socketname=cls.stats_sock)
418 cls.vpp_startup_failed = True
420 "VPP died shortly after startup, check the"
421 " output to standard error for possible cause")
427 cls.vapi.disconnect()
430 if cls.debug_gdbserver:
431 print(colorize("You're running VPP inside gdbserver but "
432 "VPP-API connection failed, did you forget "
433 "to 'continue' VPP from within gdb?", RED))
445 Disconnect vpp-api, kill vpp and cleanup shared memory files
447 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
449 if cls.vpp.returncode is None:
450 print(double_line_delim)
451 print("VPP or GDB server is still running")
452 print(single_line_delim)
453 raw_input("When done debugging, press ENTER to kill the "
454 "process and finish running the testcase...")
456 # first signal that we want to stop the pump thread, then wake it up
457 if hasattr(cls, 'pump_thread_stop_flag'):
458 cls.pump_thread_stop_flag.set()
459 if hasattr(cls, 'pump_thread_wakeup_pipe'):
460 os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
461 if hasattr(cls, 'pump_thread'):
462 cls.logger.debug("Waiting for pump thread to stop")
463 cls.pump_thread.join()
464 if hasattr(cls, 'vpp_stderr_reader_thread'):
465 cls.logger.debug("Waiting for stdderr pump to stop")
466 cls.vpp_stderr_reader_thread.join()
468 if hasattr(cls, 'vpp'):
469 if hasattr(cls, 'vapi'):
470 cls.vapi.disconnect()
473 if cls.vpp.returncode is None:
474 cls.logger.debug("Sending TERM to vpp")
476 cls.logger.debug("Waiting for vpp to die")
477 cls.vpp.communicate()
480 if cls.vpp_startup_failed:
481 stdout_log = cls.logger.info
482 stderr_log = cls.logger.critical
484 stdout_log = cls.logger.info
485 stderr_log = cls.logger.info
487 if hasattr(cls, 'vpp_stdout_deque'):
488 stdout_log(single_line_delim)
489 stdout_log('VPP output to stdout while running %s:', cls.__name__)
490 stdout_log(single_line_delim)
491 vpp_output = "".join(cls.vpp_stdout_deque)
492 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
494 stdout_log('\n%s', vpp_output)
495 stdout_log(single_line_delim)
497 if hasattr(cls, 'vpp_stderr_deque'):
498 stderr_log(single_line_delim)
499 stderr_log('VPP output to stderr while running %s:', cls.__name__)
500 stderr_log(single_line_delim)
501 vpp_output = "".join(cls.vpp_stderr_deque)
502 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
504 stderr_log('\n%s', vpp_output)
505 stderr_log(single_line_delim)
508 def tearDownClass(cls):
509 """ Perform final cleanup after running all tests in this test-case """
511 cls.file_handler.close()
512 cls.reset_packet_infos()
514 debug_internal.on_tear_down_class(cls)
517 """ Show various debug prints after each test """
518 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
519 (self.__class__.__name__, self._testMethodName,
520 self._testMethodDoc))
521 if not self.vpp_dead:
522 self.logger.debug(self.vapi.cli("show trace"))
523 self.logger.info(self.vapi.ppcli("show interface"))
524 self.logger.info(self.vapi.ppcli("show hardware"))
525 self.logger.info(self.statistics.set_errors_str())
526 self.logger.info(self.vapi.ppcli("show run"))
527 self.logger.info(self.vapi.ppcli("show log"))
528 self.registry.remove_vpp_config(self.logger)
529 # Save/Dump VPP api trace log
530 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
531 tmp_api_trace = "/tmp/%s" % api_trace
532 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
533 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
534 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
536 os.rename(tmp_api_trace, vpp_api_trace_log)
537 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
540 self.registry.unregister_all(self.logger)
543 """ Clear trace before running each test"""
544 self.reporter.send_keep_alive(self)
545 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
546 (self.__class__.__name__, self._testMethodName,
547 self._testMethodDoc))
549 raise Exception("VPP is dead when setting up the test")
550 self.sleep(.1, "during setUp")
551 self.vpp_stdout_deque.append(
552 "--- test setUp() for %s.%s(%s) starts here ---\n" %
553 (self.__class__.__name__, self._testMethodName,
554 self._testMethodDoc))
555 self.vpp_stderr_deque.append(
556 "--- test setUp() for %s.%s(%s) starts here ---\n" %
557 (self.__class__.__name__, self._testMethodName,
558 self._testMethodDoc))
559 self.vapi.cli("clear trace")
560 # store the test instance inside the test class - so that objects
561 # holding the class can access instance methods (like assertEqual)
562 type(self).test_instance = self
565 def pg_enable_capture(cls, interfaces=None):
567 Enable capture on packet-generator interfaces
569 :param interfaces: iterable interface indexes (if None,
570 use self.pg_interfaces)
573 if interfaces is None:
574 interfaces = cls.pg_interfaces
579 def register_capture(cls, cap_name):
580 """ Register a capture in the testclass """
581 # add to the list of captures with current timestamp
582 cls._captures.append((time.time(), cap_name))
583 # filter out from zombies
584 cls._zombie_captures = [(stamp, name)
585 for (stamp, name) in cls._zombie_captures
590 """ Remove any zombie captures and enable the packet generator """
591 # how long before capture is allowed to be deleted - otherwise vpp
592 # crashes - 100ms seems enough (this shouldn't be needed at all)
595 for stamp, cap_name in cls._zombie_captures:
596 wait = stamp + capture_ttl - now
598 cls.sleep(wait, "before deleting capture %s" % cap_name)
600 cls.logger.debug("Removing zombie capture %s" % cap_name)
601 cls.vapi.cli('packet-generator delete %s' % cap_name)
603 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
604 cls.vapi.cli('packet-generator enable')
605 cls._zombie_captures = cls._captures
609 def create_pg_interfaces(cls, interfaces):
611 Create packet-generator interfaces.
613 :param interfaces: iterable indexes of the interfaces.
614 :returns: List of created interfaces.
619 intf = VppPGInterface(cls, i)
620 setattr(cls, intf.name, intf)
622 cls.pg_interfaces = result
626 def create_loopback_interfaces(cls, count):
628 Create loopback interfaces.
630 :param count: number of interfaces created.
631 :returns: List of created interfaces.
633 result = [VppLoInterface(cls) for i in range(count)]
635 setattr(cls, intf.name, intf)
636 cls.lo_interfaces = result
640 def extend_packet(packet, size, padding=' '):
642 Extend packet to given size by padding with spaces or custom padding
643 NOTE: Currently works only when Raw layer is present.
645 :param packet: packet
646 :param size: target size
647 :param padding: padding used to extend the payload
650 packet_len = len(packet) + 4
651 extend = size - packet_len
653 num = (extend / len(padding)) + 1
654 packet[Raw].load += (padding * num)[:extend]
657 def reset_packet_infos(cls):
658 """ Reset the list of packet info objects and packet counts to zero """
659 cls._packet_infos = {}
660 cls._packet_count_for_dst_if_idx = {}
663 def create_packet_info(cls, src_if, dst_if):
665 Create packet info object containing the source and destination indexes
666 and add it to the testcase's packet info list
668 :param VppInterface src_if: source interface
669 :param VppInterface dst_if: destination interface
671 :returns: _PacketInfo object
675 info.index = len(cls._packet_infos)
676 info.src = src_if.sw_if_index
677 info.dst = dst_if.sw_if_index
678 if isinstance(dst_if, VppSubInterface):
679 dst_idx = dst_if.parent.sw_if_index
681 dst_idx = dst_if.sw_if_index
682 if dst_idx in cls._packet_count_for_dst_if_idx:
683 cls._packet_count_for_dst_if_idx[dst_idx] += 1
685 cls._packet_count_for_dst_if_idx[dst_idx] = 1
686 cls._packet_infos[info.index] = info
690 def info_to_payload(info):
692 Convert _PacketInfo object to packet payload
694 :param info: _PacketInfo object
696 :returns: string containing serialized data from packet info
698 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
702 def payload_to_info(payload):
704 Convert packet payload to _PacketInfo object
706 :param payload: packet payload
708 :returns: _PacketInfo object containing de-serialized data from payload
711 numbers = payload.split()
713 info.index = int(numbers[0])
714 info.src = int(numbers[1])
715 info.dst = int(numbers[2])
716 info.ip = int(numbers[3])
717 info.proto = int(numbers[4])
720 def get_next_packet_info(self, info):
722 Iterate over the packet info list stored in the testcase
723 Start iteration with first element if info is None
724 Continue based on index in info if info is specified
726 :param info: info or None
727 :returns: next info in list or None if no more infos
732 next_index = info.index + 1
733 if next_index == len(self._packet_infos):
736 return self._packet_infos[next_index]
738 def get_next_packet_info_for_interface(self, src_index, info):
740 Search the packet info list for the next packet info with same source
743 :param src_index: source interface index to search for
744 :param info: packet info - where to start the search
745 :returns: packet info or None
749 info = self.get_next_packet_info(info)
752 if info.src == src_index:
755 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
757 Search the packet info list for the next packet info with same source
758 and destination interface indexes
760 :param src_index: source interface index to search for
761 :param dst_index: destination interface index to search for
762 :param info: packet info - where to start the search
763 :returns: packet info or None
767 info = self.get_next_packet_info_for_interface(src_index, info)
770 if info.dst == dst_index:
773 def assert_equal(self, real_value, expected_value, name_or_class=None):
774 if name_or_class is None:
775 self.assertEqual(real_value, expected_value)
778 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
779 msg = msg % (getdoc(name_or_class).strip(),
780 real_value, str(name_or_class(real_value)),
781 expected_value, str(name_or_class(expected_value)))
783 msg = "Invalid %s: %s does not match expected value %s" % (
784 name_or_class, real_value, expected_value)
786 self.assertEqual(real_value, expected_value, msg)
788 def assert_in_range(self,
796 msg = "Invalid %s: %s out of range <%s,%s>" % (
797 name, real_value, expected_min, expected_max)
798 self.assertTrue(expected_min <= real_value <= expected_max, msg)
800 def assert_packet_checksums_valid(self, packet,
801 ignore_zero_udp_checksums=True):
802 received = packet.__class__(str(packet))
804 ppp("Verifying packet checksums for packet:", received))
805 udp_layers = ['UDP', 'UDPerror']
806 checksum_fields = ['cksum', 'chksum']
809 temp = received.__class__(str(received))
811 layer = temp.getlayer(counter)
813 for cf in checksum_fields:
814 if hasattr(layer, cf):
815 if ignore_zero_udp_checksums and \
816 0 == getattr(layer, cf) and \
817 layer.name in udp_layers:
820 checksums.append((counter, cf))
823 counter = counter + 1
824 if 0 == len(checksums):
826 temp = temp.__class__(str(temp))
827 for layer, cf in checksums:
828 calc_sum = getattr(temp[layer], cf)
830 getattr(received[layer], cf), calc_sum,
831 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
833 "Checksum field `%s` on `%s` layer has correct value `%s`" %
834 (cf, temp[layer].name, calc_sum))
836 def assert_checksum_valid(self, received_packet, layer,
838 ignore_zero_checksum=False):
839 """ Check checksum of received packet on given layer """
840 received_packet_checksum = getattr(received_packet[layer], field_name)
841 if ignore_zero_checksum and 0 == received_packet_checksum:
843 recalculated = received_packet.__class__(str(received_packet))
844 delattr(recalculated[layer], field_name)
845 recalculated = recalculated.__class__(str(recalculated))
846 self.assert_equal(received_packet_checksum,
847 getattr(recalculated[layer], field_name),
848 "packet checksum on layer: %s" % layer)
850 def assert_ip_checksum_valid(self, received_packet,
851 ignore_zero_checksum=False):
852 self.assert_checksum_valid(received_packet, 'IP',
853 ignore_zero_checksum=ignore_zero_checksum)
855 def assert_tcp_checksum_valid(self, received_packet,
856 ignore_zero_checksum=False):
857 self.assert_checksum_valid(received_packet, 'TCP',
858 ignore_zero_checksum=ignore_zero_checksum)
860 def assert_udp_checksum_valid(self, received_packet,
861 ignore_zero_checksum=True):
862 self.assert_checksum_valid(received_packet, 'UDP',
863 ignore_zero_checksum=ignore_zero_checksum)
865 def assert_embedded_icmp_checksum_valid(self, received_packet):
866 if received_packet.haslayer(IPerror):
867 self.assert_checksum_valid(received_packet, 'IPerror')
868 if received_packet.haslayer(TCPerror):
869 self.assert_checksum_valid(received_packet, 'TCPerror')
870 if received_packet.haslayer(UDPerror):
871 self.assert_checksum_valid(received_packet, 'UDPerror',
872 ignore_zero_checksum=True)
873 if received_packet.haslayer(ICMPerror):
874 self.assert_checksum_valid(received_packet, 'ICMPerror')
876 def assert_icmp_checksum_valid(self, received_packet):
877 self.assert_checksum_valid(received_packet, 'ICMP')
878 self.assert_embedded_icmp_checksum_valid(received_packet)
880 def assert_icmpv6_checksum_valid(self, pkt):
881 if pkt.haslayer(ICMPv6DestUnreach):
882 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
883 self.assert_embedded_icmp_checksum_valid(pkt)
884 if pkt.haslayer(ICMPv6EchoRequest):
885 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
886 if pkt.haslayer(ICMPv6EchoReply):
887 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
890 def sleep(cls, timeout, remark=None):
891 if hasattr(cls, 'logger'):
892 cls.logger.debug("Starting sleep for %ss (%s)" % (timeout, remark))
896 if after - before > 2 * timeout:
897 cls.logger.error("unexpected time.sleep() result - "
898 "slept for %ss instead of ~%ss!" % (
899 after - before, timeout))
900 if hasattr(cls, 'logger'):
902 "Finished sleep (%s) - slept %ss (wanted %ss)" % (
903 remark, after - before, timeout))
905 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
906 self.vapi.cli("clear trace")
907 intf.add_stream(pkts)
908 self.pg_enable_capture(self.pg_interfaces)
912 for i in self.pg_interfaces:
913 i.get_capture(0, timeout=timeout)
914 i.assert_nothing_captured(remark=remark)
917 def send_and_expect(self, input, pkts, output):
918 self.vapi.cli("clear trace")
919 input.add_stream(pkts)
920 self.pg_enable_capture(self.pg_interfaces)
922 rx = output.get_capture(len(pkts))
926 def get_testcase_doc_name(test):
927 return getdoc(test.__class__).splitlines()[0]
930 def get_test_description(descriptions, test):
931 # TODO: if none print warning not raise exception
932 short_description = test.shortDescription()
933 if descriptions and short_description:
934 return short_description
939 class TestCasePrinter(object):
943 self.__dict__ = self._shared_state
944 if not hasattr(self, "_test_case_set"):
945 self._test_case_set = set()
947 def print_test_case_heading_if_first_time(self, case):
948 if case.__class__ not in self._test_case_set:
949 print(double_line_delim)
950 print(colorize(get_testcase_doc_name(case), GREEN))
951 print(double_line_delim)
952 self._test_case_set.add(case.__class__)
955 class VppTestResult(unittest.TestResult):
957 @property result_string
958 String variable to store the test case result string.
960 List variable containing 2-tuples of TestCase instances and strings
961 holding formatted tracebacks. Each tuple represents a test which
962 raised an unexpected exception.
964 List variable containing 2-tuples of TestCase instances and strings
965 holding formatted tracebacks. Each tuple represents a test where
966 a failure was explicitly signalled using the TestCase.assert*()
970 def __init__(self, stream, descriptions, verbosity):
972 :param stream File descriptor to store where to report test results.
973 Set to the standard error stream by default.
974 :param descriptions Boolean variable to store information if to use
975 test case descriptions.
976 :param verbosity Integer variable to store required verbosity level.
978 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
980 self.descriptions = descriptions
981 self.verbosity = verbosity
982 self.result_string = None
983 self.printer = TestCasePrinter()
985 def addSuccess(self, test):
987 Record a test succeeded result
992 if hasattr(test, 'logger'):
993 test.logger.debug("--- addSuccess() %s.%s(%s) called"
994 % (test.__class__.__name__,
995 test._testMethodName,
996 test._testMethodDoc))
997 unittest.TestResult.addSuccess(self, test)
998 self.result_string = colorize("OK", GREEN)
1000 self.send_result_through_pipe(test, PASS)
1002 def addSkip(self, test, reason):
1004 Record a test skipped.
1010 if hasattr(test, 'logger'):
1011 test.logger.debug("--- addSkip() %s.%s(%s) called, reason is %s"
1012 % (test.__class__.__name__,
1013 test._testMethodName,
1014 test._testMethodDoc,
1016 unittest.TestResult.addSkip(self, test, reason)
1017 self.result_string = colorize("SKIP", YELLOW)
1019 self.send_result_through_pipe(test, SKIP)
1021 def symlink_failed(self, test):
1023 if hasattr(test, 'logger'):
1024 logger = test.logger
1025 if hasattr(test, 'tempdir'):
1027 failed_dir = os.getenv('VPP_TEST_FAILED_DIR')
1028 link_path = os.path.join(failed_dir, '%s-FAILED' %
1029 os.path.basename(test.tempdir))
1031 logger.debug("creating a link to the failed test")
1032 logger.debug("os.symlink(%s, %s)" %
1033 (test.tempdir, link_path))
1034 if os.path.exists(link_path):
1036 logger.debug('symlink already exists')
1038 os.symlink(test.tempdir, link_path)
1040 except Exception as e:
1044 def send_result_through_pipe(self, test, result):
1045 if hasattr(self, 'test_framework_result_pipe'):
1046 pipe = self.test_framework_result_pipe
1048 pipe.send((test.id(), result))
1050 def addFailure(self, test, err):
1052 Record a test failed result
1055 :param err: error message
1058 if hasattr(test, 'logger'):
1059 test.logger.debug("--- addFailure() %s.%s(%s) called, err is %s"
1060 % (test.__class__.__name__,
1061 test._testMethodName,
1062 test._testMethodDoc, err))
1063 test.logger.debug("formatted exception is:\n%s" %
1064 "".join(format_exception(*err)))
1065 unittest.TestResult.addFailure(self, test, err)
1066 if hasattr(test, 'tempdir'):
1067 self.result_string = colorize("FAIL", RED) + \
1068 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
1069 self.symlink_failed(test)
1071 self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
1073 self.send_result_through_pipe(test, FAIL)
1075 def addError(self, test, err):
1077 Record a test error result
1080 :param err: error message
1083 if hasattr(test, 'logger'):
1084 test.logger.debug("--- addError() %s.%s(%s) called, err is %s"
1085 % (test.__class__.__name__,
1086 test._testMethodName,
1087 test._testMethodDoc, err))
1088 test.logger.debug("formatted exception is:\n%s" %
1089 "".join(format_exception(*err)))
1090 unittest.TestResult.addError(self, test, err)
1091 if hasattr(test, 'tempdir'):
1092 self.result_string = colorize("ERROR", RED) + \
1093 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
1094 self.symlink_failed(test)
1096 self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
1098 self.send_result_through_pipe(test, ERROR)
1100 def getDescription(self, test):
1102 Get test description
1105 :returns: test description
1108 return get_test_description(self.descriptions, test)
1110 def startTest(self, test):
1117 self.printer.print_test_case_heading_if_first_time(test)
1118 unittest.TestResult.startTest(self, test)
1119 if self.verbosity > 0:
1120 self.stream.writeln(
1121 "Starting " + self.getDescription(test) + " ...")
1122 self.stream.writeln(single_line_delim)
1124 def stopTest(self, test):
1126 Called when the given test has been run
1131 unittest.TestResult.stopTest(self, test)
1132 if self.verbosity > 0:
1133 self.stream.writeln(single_line_delim)
1134 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1135 self.result_string))
1136 self.stream.writeln(single_line_delim)
1138 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1139 self.result_string))
1141 self.send_result_through_pipe(test, TEST_RUN)
1143 def printErrors(self):
1145 Print errors from running the test case
1147 self.stream.writeln()
1148 self.printErrorList('ERROR', self.errors)
1149 self.printErrorList('FAIL', self.failures)
1151 def printErrorList(self, flavour, errors):
1153 Print error list to the output stream together with error type
1154 and test case description.
1156 :param flavour: error type
1157 :param errors: iterable errors
1160 for test, err in errors:
1161 self.stream.writeln(double_line_delim)
1162 self.stream.writeln("%s: %s" %
1163 (flavour, self.getDescription(test)))
1164 self.stream.writeln(single_line_delim)
1165 self.stream.writeln("%s" % err)
1168 class VppTestRunner(unittest.TextTestRunner):
1170 A basic test runner implementation which prints results to standard error.
1173 def resultclass(self):
1174 """Class maintaining the results of the tests"""
1175 return VppTestResult
1177 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1178 result_pipe=None, failfast=False, buffer=False,
1180 # ignore stream setting here, use hard-coded stdout to be in sync
1181 # with prints from VppTestCase methods ...
1182 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1183 verbosity, failfast, buffer,
1185 reporter = KeepAliveReporter()
1186 reporter.pipe = keep_alive_pipe
1188 VppTestResult.test_framework_result_pipe = result_pipe
1190 def run(self, test):
1197 faulthandler.enable() # emit stack trace to stderr if killed by signal
1199 result = super(VppTestRunner, self).run(test)
1203 class Worker(Thread):
1204 def __init__(self, args, logger, env={}):
1205 self.logger = logger
1208 self.env = copy.deepcopy(env)
1209 super(Worker, self).__init__()
1212 executable = self.args[0]
1213 self.logger.debug("Running executable w/args `%s'" % self.args)
1214 env = os.environ.copy()
1215 env.update(self.env)
1216 env["CK_LOG_FILE_NAME"] = "-"
1217 self.process = subprocess.Popen(
1218 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1219 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1220 out, err = self.process.communicate()
1221 self.logger.debug("Finished running `%s'" % executable)
1222 self.logger.info("Return code is `%s'" % self.process.returncode)
1223 self.logger.info(single_line_delim)
1224 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1225 self.logger.info(single_line_delim)
1226 self.logger.info(out)
1227 self.logger.info(single_line_delim)
1228 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1229 self.logger.info(single_line_delim)
1230 self.logger.info(err)
1231 self.logger.info(single_line_delim)
1232 self.result = self.process.returncode