3 from __future__ import print_function
15 from collections import deque
16 from threading import Thread, Event
17 from inspect import getdoc, isclass
18 from traceback import format_exception
19 from logging import FileHandler, DEBUG, Formatter
20 from scapy.packet import Raw
21 from hook import StepHook, PollHook, VppDiedError
22 from vpp_pg_interface import VppPGInterface
23 from vpp_sub_interface import VppSubInterface
24 from vpp_lo_interface import VppLoInterface
25 from vpp_papi_provider import VppPapiProvider
26 from vpp_papi.vpp_stats import VPPStats
27 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
29 from vpp_object import VppObjectRegistry
30 from util import ppp, is_core_present
31 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
32 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
33 from scapy.layers.inet6 import ICMPv6EchoReply
34 if os.name == 'posix' and sys.version_info[0] < 3:
35 # using subprocess32 is recommended by python official documentation
36 # @ https://docs.python.org/2/library/subprocess.html
37 import subprocess32 as subprocess
49 debug_framework = False
50 if os.getenv('TEST_DEBUG', "0") == "1":
51 debug_framework = True
56 Test framework module.
58 The module provides a set of tools for constructing and running tests and
59 representing the results.
63 class _PacketInfo(object):
64 """Private class to create packet info object.
66 Help process information about the next packet.
67 Set variables to default values.
69 #: Store the index of the packet.
71 #: Store the index of the source packet generator interface of the packet.
73 #: Store the index of the destination packet generator interface
76 #: Store expected ip version
78 #: Store expected upper protocol
80 #: Store the copy of the former packet.
83 def __eq__(self, other):
84 index = self.index == other.index
85 src = self.src == other.src
86 dst = self.dst == other.dst
87 data = self.data == other.data
88 return index and src and dst and data
91 def pump_output(testclass):
92 """ pump output from vpp stdout/stderr to proper queues """
95 while not testclass.pump_thread_stop_flag.is_set():
96 readable = select.select([testclass.vpp.stdout.fileno(),
97 testclass.vpp.stderr.fileno(),
98 testclass.pump_thread_wakeup_pipe[0]],
100 if testclass.vpp.stdout.fileno() in readable:
101 read = os.read(testclass.vpp.stdout.fileno(), 102400)
103 split = read.splitlines(True)
104 if len(stdout_fragment) > 0:
105 split[0] = "%s%s" % (stdout_fragment, split[0])
106 if len(split) > 0 and split[-1].endswith("\n"):
110 stdout_fragment = split[-1]
111 testclass.vpp_stdout_deque.extend(split[:limit])
112 if not testclass.cache_vpp_output:
113 for line in split[:limit]:
114 testclass.logger.debug(
115 "VPP STDOUT: %s" % line.rstrip("\n"))
116 if testclass.vpp.stderr.fileno() in readable:
117 read = os.read(testclass.vpp.stderr.fileno(), 102400)
119 split = read.splitlines(True)
120 if len(stderr_fragment) > 0:
121 split[0] = "%s%s" % (stderr_fragment, split[0])
122 if len(split) > 0 and split[-1].endswith("\n"):
126 stderr_fragment = split[-1]
127 testclass.vpp_stderr_deque.extend(split[:limit])
128 if not testclass.cache_vpp_output:
129 for line in split[:limit]:
130 testclass.logger.debug(
131 "VPP STDERR: %s" % line.rstrip("\n"))
132 # ignoring the dummy pipe here intentionally - the flag will take care
133 # of properly terminating the loop
136 def running_extended_tests():
137 s = os.getenv("EXTENDED_TESTS", "n")
138 return True if s.lower() in ("y", "yes", "1") else False
141 def running_on_centos():
142 os_id = os.getenv("OS_ID", "")
143 return True if "centos" in os_id.lower() else False
146 class KeepAliveReporter(object):
148 Singleton object which reports test start to parent process
153 self.__dict__ = self._shared_state
161 def pipe(self, pipe):
162 if self._pipe is not None:
163 raise Exception("Internal error - pipe should only be set once.")
166 def send_keep_alive(self, test, desc=None):
168 Write current test tmpdir & desc to keep-alive pipe to signal liveness
170 if self.pipe is None:
171 # if not running forked..
175 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
179 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
182 class VppTestCase(unittest.TestCase):
183 """This subclass is a base class for VPP test cases that are implemented as
184 classes. It provides methods to create and run test case.
188 def packet_infos(self):
189 """List of packet infos"""
190 return self._packet_infos
193 def get_packet_count_for_if_idx(cls, dst_if_index):
194 """Get the number of packet info for specified destination if index"""
195 if dst_if_index in cls._packet_count_for_dst_if_idx:
196 return cls._packet_count_for_dst_if_idx[dst_if_index]
202 """Return the instance of this testcase"""
203 return cls.test_instance
206 def set_debug_flags(cls, d):
207 cls.debug_core = False
208 cls.debug_gdb = False
209 cls.debug_gdbserver = False
214 cls.debug_core = True
217 elif dl == "gdbserver":
218 cls.debug_gdbserver = True
220 raise Exception("Unrecognized DEBUG option: '%s'" % d)
223 def get_least_used_cpu():
224 cpu_usage_list = [set(range(psutil.cpu_count()))]
225 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
226 if 'vpp_main' == p.info['name']]
227 for vpp_process in vpp_processes:
228 for cpu_usage_set in cpu_usage_list:
230 cpu_num = vpp_process.cpu_num()
231 if cpu_num in cpu_usage_set:
232 cpu_usage_set_index = cpu_usage_list.index(
234 if cpu_usage_set_index == len(cpu_usage_list) - 1:
235 cpu_usage_list.append({cpu_num})
237 cpu_usage_list[cpu_usage_set_index + 1].add(
239 cpu_usage_set.remove(cpu_num)
241 except psutil.NoSuchProcess:
244 for cpu_usage_set in cpu_usage_list:
245 if len(cpu_usage_set) > 0:
246 min_usage_set = cpu_usage_set
249 return random.choice(tuple(min_usage_set))
252 def print_header(cls):
253 if not hasattr(cls, '_header_printed'):
254 print(double_line_delim)
255 print(colorize(getdoc(cls).splitlines()[0], GREEN))
256 print(double_line_delim)
257 cls._header_printed = True
260 def setUpConstants(cls):
261 """ Set-up the test case class based on environment variables """
262 s = os.getenv("STEP", "n")
263 cls.step = True if s.lower() in ("y", "yes", "1") else False
264 d = os.getenv("DEBUG", None)
265 c = os.getenv("CACHE_OUTPUT", "1")
266 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
267 cls.set_debug_flags(d)
268 cls.vpp_bin = os.getenv('VPP_BIN', "vpp")
269 cls.plugin_path = os.getenv('VPP_PLUGIN_PATH')
270 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
272 if cls.plugin_path is not None:
273 if cls.extern_plugin_path is not None:
274 plugin_path = "%s:%s" % (
275 cls.plugin_path, cls.extern_plugin_path)
277 plugin_path = cls.plugin_path
278 elif cls.extern_plugin_path is not None:
279 plugin_path = cls.extern_plugin_path
281 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
282 debug_cli = "cli-listen localhost:5002"
284 size = os.getenv("COREDUMP_SIZE")
286 coredump_size = "coredump-size %s" % size
287 if coredump_size is None:
288 coredump_size = "coredump-size unlimited"
290 cpu_core_number = cls.get_least_used_cpu()
292 cls.vpp_cmdline = [cls.vpp_bin, "unix",
293 "{", "nodaemon", debug_cli, "full-coredump",
294 coredump_size, "runtime-dir", cls.tempdir, "}",
295 "api-trace", "{", "on", "}", "api-segment", "{",
296 "prefix", cls.shm_prefix, "}", "cpu", "{",
297 "main-core", str(cpu_core_number), "}", "statseg",
298 "{", "socket-name", cls.stats_sock, "}", "plugins",
299 "{", "plugin", "dpdk_plugin.so", "{", "disable",
300 "}", "plugin", "unittest_plugin.so", "{", "enable",
302 if plugin_path is not None:
303 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
304 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
305 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
308 def wait_for_enter(cls):
309 if cls.debug_gdbserver:
310 print(double_line_delim)
311 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
313 print(double_line_delim)
314 print("Spawned VPP with PID: %d" % cls.vpp.pid)
316 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
318 print(single_line_delim)
319 print("You can debug the VPP using e.g.:")
320 if cls.debug_gdbserver:
321 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
322 print("Now is the time to attach a gdb by running the above "
323 "command, set up breakpoints etc. and then resume VPP from "
324 "within gdb by issuing the 'continue' command")
326 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
327 print("Now is the time to attach a gdb by running the above "
328 "command and set up breakpoints etc.")
329 print(single_line_delim)
330 raw_input("Press ENTER to continue running the testcase...")
334 cmdline = cls.vpp_cmdline
336 if cls.debug_gdbserver:
337 gdbserver = '/usr/bin/gdbserver'
338 if not os.path.isfile(gdbserver) or \
339 not os.access(gdbserver, os.X_OK):
340 raise Exception("gdbserver binary '%s' does not exist or is "
341 "not executable" % gdbserver)
343 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
344 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
347 cls.vpp = subprocess.Popen(cmdline,
348 stdout=subprocess.PIPE,
349 stderr=subprocess.PIPE,
351 except subprocess.CalledProcessError as e:
352 cls.logger.critical("Couldn't start vpp: %s" % e)
358 def wait_for_stats_socket(cls):
359 deadline = time.time() + 3
361 while time.time() < deadline or \
362 cls.debug_gdb or cls.debug_gdbserver:
363 if os.path.exists(cls.stats_sock):
368 cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
373 Perform class setup before running the testcase
374 Remove shared memory files, start vpp and connect the vpp-api
376 gc.collect() # run garbage collection first
379 cls.logger = get_logger(cls.__name__)
380 if hasattr(cls, 'parallel_handler'):
381 cls.logger.addHandler(cls.parallel_handler)
382 cls.logger.propagate = False
383 cls.tempdir = tempfile.mkdtemp(
384 prefix='vpp-unittest-%s-' % cls.__name__)
385 cls.stats_sock = "%s/stats.sock" % cls.tempdir
386 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
387 cls.file_handler.setFormatter(
388 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
390 cls.file_handler.setLevel(DEBUG)
391 cls.logger.addHandler(cls.file_handler)
392 cls.shm_prefix = os.path.basename(cls.tempdir)
393 os.chdir(cls.tempdir)
394 cls.logger.info("Temporary dir is %s, shm prefix is %s",
395 cls.tempdir, cls.shm_prefix)
397 cls.reset_packet_infos()
399 cls._zombie_captures = []
402 cls.registry = VppObjectRegistry()
403 cls.vpp_startup_failed = False
404 cls.reporter = KeepAliveReporter()
405 # need to catch exceptions here because if we raise, then the cleanup
406 # doesn't get called and we might end with a zombie vpp
409 cls.reporter.send_keep_alive(cls, 'setUpClass')
410 VppTestResult.current_test_case_info = TestCaseInfo(
411 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
412 cls.vpp_stdout_deque = deque()
413 cls.vpp_stderr_deque = deque()
414 cls.pump_thread_stop_flag = Event()
415 cls.pump_thread_wakeup_pipe = os.pipe()
416 cls.pump_thread = Thread(target=pump_output, args=(cls,))
417 cls.pump_thread.daemon = True
418 cls.pump_thread.start()
419 if cls.debug_gdb or cls.debug_gdbserver:
423 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
429 cls.vapi.register_hook(hook)
430 cls.wait_for_stats_socket()
431 cls.statistics = VPPStats(socketname=cls.stats_sock)
435 cls.vpp_startup_failed = True
437 "VPP died shortly after startup, check the"
438 " output to standard error for possible cause")
444 cls.vapi.disconnect()
447 if cls.debug_gdbserver:
448 print(colorize("You're running VPP inside gdbserver but "
449 "VPP-API connection failed, did you forget "
450 "to 'continue' VPP from within gdb?", RED))
462 Disconnect vpp-api, kill vpp and cleanup shared memory files
464 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
466 if cls.vpp.returncode is None:
467 print(double_line_delim)
468 print("VPP or GDB server is still running")
469 print(single_line_delim)
470 raw_input("When done debugging, press ENTER to kill the "
471 "process and finish running the testcase...")
473 # first signal that we want to stop the pump thread, then wake it up
474 if hasattr(cls, 'pump_thread_stop_flag'):
475 cls.pump_thread_stop_flag.set()
476 if hasattr(cls, 'pump_thread_wakeup_pipe'):
477 os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
478 if hasattr(cls, 'pump_thread'):
479 cls.logger.debug("Waiting for pump thread to stop")
480 cls.pump_thread.join()
481 if hasattr(cls, 'vpp_stderr_reader_thread'):
482 cls.logger.debug("Waiting for stdderr pump to stop")
483 cls.vpp_stderr_reader_thread.join()
485 if hasattr(cls, 'vpp'):
486 if hasattr(cls, 'vapi'):
487 cls.vapi.disconnect()
490 if cls.vpp.returncode is None:
491 cls.logger.debug("Sending TERM to vpp")
493 cls.logger.debug("Waiting for vpp to die")
494 cls.vpp.communicate()
497 if cls.vpp_startup_failed:
498 stdout_log = cls.logger.info
499 stderr_log = cls.logger.critical
501 stdout_log = cls.logger.info
502 stderr_log = cls.logger.info
504 if hasattr(cls, 'vpp_stdout_deque'):
505 stdout_log(single_line_delim)
506 stdout_log('VPP output to stdout while running %s:', cls.__name__)
507 stdout_log(single_line_delim)
508 vpp_output = "".join(cls.vpp_stdout_deque)
509 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
511 stdout_log('\n%s', vpp_output)
512 stdout_log(single_line_delim)
514 if hasattr(cls, 'vpp_stderr_deque'):
515 stderr_log(single_line_delim)
516 stderr_log('VPP output to stderr while running %s:', cls.__name__)
517 stderr_log(single_line_delim)
518 vpp_output = "".join(cls.vpp_stderr_deque)
519 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
521 stderr_log('\n%s', vpp_output)
522 stderr_log(single_line_delim)
525 def tearDownClass(cls):
526 """ Perform final cleanup after running all tests in this test-case """
527 cls.reporter.send_keep_alive(cls, 'tearDownClass')
529 cls.file_handler.close()
530 cls.reset_packet_infos()
532 debug_internal.on_tear_down_class(cls)
535 """ Show various debug prints after each test """
536 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
537 (self.__class__.__name__, self._testMethodName,
538 self._testMethodDoc))
539 if not self.vpp_dead:
540 self.logger.debug(self.vapi.cli("show trace"))
541 self.logger.info(self.vapi.ppcli("show interface"))
542 self.logger.info(self.vapi.ppcli("show hardware"))
543 self.logger.info(self.statistics.set_errors_str())
544 self.logger.info(self.vapi.ppcli("show run"))
545 self.logger.info(self.vapi.ppcli("show log"))
546 self.registry.remove_vpp_config(self.logger)
547 # Save/Dump VPP api trace log
548 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
549 tmp_api_trace = "/tmp/%s" % api_trace
550 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
551 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
552 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
554 os.rename(tmp_api_trace, vpp_api_trace_log)
555 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
558 self.registry.unregister_all(self.logger)
561 """ Clear trace before running each test"""
562 self.reporter.send_keep_alive(self)
563 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
564 (self.__class__.__name__, self._testMethodName,
565 self._testMethodDoc))
567 raise Exception("VPP is dead when setting up the test")
568 self.sleep(.1, "during setUp")
569 self.vpp_stdout_deque.append(
570 "--- test setUp() for %s.%s(%s) starts here ---\n" %
571 (self.__class__.__name__, self._testMethodName,
572 self._testMethodDoc))
573 self.vpp_stderr_deque.append(
574 "--- test setUp() for %s.%s(%s) starts here ---\n" %
575 (self.__class__.__name__, self._testMethodName,
576 self._testMethodDoc))
577 self.vapi.cli("clear trace")
578 # store the test instance inside the test class - so that objects
579 # holding the class can access instance methods (like assertEqual)
580 type(self).test_instance = self
583 def pg_enable_capture(cls, interfaces=None):
585 Enable capture on packet-generator interfaces
587 :param interfaces: iterable interface indexes (if None,
588 use self.pg_interfaces)
591 if interfaces is None:
592 interfaces = cls.pg_interfaces
597 def register_capture(cls, cap_name):
598 """ Register a capture in the testclass """
599 # add to the list of captures with current timestamp
600 cls._captures.append((time.time(), cap_name))
601 # filter out from zombies
602 cls._zombie_captures = [(stamp, name)
603 for (stamp, name) in cls._zombie_captures
608 """ Remove any zombie captures and enable the packet generator """
609 # how long before capture is allowed to be deleted - otherwise vpp
610 # crashes - 100ms seems enough (this shouldn't be needed at all)
613 for stamp, cap_name in cls._zombie_captures:
614 wait = stamp + capture_ttl - now
616 cls.sleep(wait, "before deleting capture %s" % cap_name)
618 cls.logger.debug("Removing zombie capture %s" % cap_name)
619 cls.vapi.cli('packet-generator delete %s' % cap_name)
621 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
622 cls.vapi.cli('packet-generator enable')
623 cls._zombie_captures = cls._captures
627 def create_pg_interfaces(cls, interfaces):
629 Create packet-generator interfaces.
631 :param interfaces: iterable indexes of the interfaces.
632 :returns: List of created interfaces.
637 intf = VppPGInterface(cls, i)
638 setattr(cls, intf.name, intf)
640 cls.pg_interfaces = result
644 def create_loopback_interfaces(cls, count):
646 Create loopback interfaces.
648 :param count: number of interfaces created.
649 :returns: List of created interfaces.
651 result = [VppLoInterface(cls) for i in range(count)]
653 setattr(cls, intf.name, intf)
654 cls.lo_interfaces = result
658 def extend_packet(packet, size, padding=' '):
660 Extend packet to given size by padding with spaces or custom padding
661 NOTE: Currently works only when Raw layer is present.
663 :param packet: packet
664 :param size: target size
665 :param padding: padding used to extend the payload
668 packet_len = len(packet) + 4
669 extend = size - packet_len
671 num = (extend / len(padding)) + 1
672 packet[Raw].load += (padding * num)[:extend]
675 def reset_packet_infos(cls):
676 """ Reset the list of packet info objects and packet counts to zero """
677 cls._packet_infos = {}
678 cls._packet_count_for_dst_if_idx = {}
681 def create_packet_info(cls, src_if, dst_if):
683 Create packet info object containing the source and destination indexes
684 and add it to the testcase's packet info list
686 :param VppInterface src_if: source interface
687 :param VppInterface dst_if: destination interface
689 :returns: _PacketInfo object
693 info.index = len(cls._packet_infos)
694 info.src = src_if.sw_if_index
695 info.dst = dst_if.sw_if_index
696 if isinstance(dst_if, VppSubInterface):
697 dst_idx = dst_if.parent.sw_if_index
699 dst_idx = dst_if.sw_if_index
700 if dst_idx in cls._packet_count_for_dst_if_idx:
701 cls._packet_count_for_dst_if_idx[dst_idx] += 1
703 cls._packet_count_for_dst_if_idx[dst_idx] = 1
704 cls._packet_infos[info.index] = info
708 def info_to_payload(info):
710 Convert _PacketInfo object to packet payload
712 :param info: _PacketInfo object
714 :returns: string containing serialized data from packet info
716 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
720 def payload_to_info(payload):
722 Convert packet payload to _PacketInfo object
724 :param payload: packet payload
726 :returns: _PacketInfo object containing de-serialized data from payload
729 numbers = payload.split()
731 info.index = int(numbers[0])
732 info.src = int(numbers[1])
733 info.dst = int(numbers[2])
734 info.ip = int(numbers[3])
735 info.proto = int(numbers[4])
738 def get_next_packet_info(self, info):
740 Iterate over the packet info list stored in the testcase
741 Start iteration with first element if info is None
742 Continue based on index in info if info is specified
744 :param info: info or None
745 :returns: next info in list or None if no more infos
750 next_index = info.index + 1
751 if next_index == len(self._packet_infos):
754 return self._packet_infos[next_index]
756 def get_next_packet_info_for_interface(self, src_index, info):
758 Search the packet info list for the next packet info with same source
761 :param src_index: source interface index to search for
762 :param info: packet info - where to start the search
763 :returns: packet info or None
767 info = self.get_next_packet_info(info)
770 if info.src == src_index:
773 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
775 Search the packet info list for the next packet info with same source
776 and destination interface indexes
778 :param src_index: source interface index to search for
779 :param dst_index: destination interface index to search for
780 :param info: packet info - where to start the search
781 :returns: packet info or None
785 info = self.get_next_packet_info_for_interface(src_index, info)
788 if info.dst == dst_index:
791 def assert_equal(self, real_value, expected_value, name_or_class=None):
792 if name_or_class is None:
793 self.assertEqual(real_value, expected_value)
796 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
797 msg = msg % (getdoc(name_or_class).strip(),
798 real_value, str(name_or_class(real_value)),
799 expected_value, str(name_or_class(expected_value)))
801 msg = "Invalid %s: %s does not match expected value %s" % (
802 name_or_class, real_value, expected_value)
804 self.assertEqual(real_value, expected_value, msg)
806 def assert_in_range(self,
814 msg = "Invalid %s: %s out of range <%s,%s>" % (
815 name, real_value, expected_min, expected_max)
816 self.assertTrue(expected_min <= real_value <= expected_max, msg)
818 def assert_packet_checksums_valid(self, packet,
819 ignore_zero_udp_checksums=True):
820 received = packet.__class__(str(packet))
822 ppp("Verifying packet checksums for packet:", received))
823 udp_layers = ['UDP', 'UDPerror']
824 checksum_fields = ['cksum', 'chksum']
827 temp = received.__class__(str(received))
829 layer = temp.getlayer(counter)
831 for cf in checksum_fields:
832 if hasattr(layer, cf):
833 if ignore_zero_udp_checksums and \
834 0 == getattr(layer, cf) and \
835 layer.name in udp_layers:
838 checksums.append((counter, cf))
841 counter = counter + 1
842 if 0 == len(checksums):
844 temp = temp.__class__(str(temp))
845 for layer, cf in checksums:
846 calc_sum = getattr(temp[layer], cf)
848 getattr(received[layer], cf), calc_sum,
849 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
851 "Checksum field `%s` on `%s` layer has correct value `%s`" %
852 (cf, temp[layer].name, calc_sum))
854 def assert_checksum_valid(self, received_packet, layer,
856 ignore_zero_checksum=False):
857 """ Check checksum of received packet on given layer """
858 received_packet_checksum = getattr(received_packet[layer], field_name)
859 if ignore_zero_checksum and 0 == received_packet_checksum:
861 recalculated = received_packet.__class__(str(received_packet))
862 delattr(recalculated[layer], field_name)
863 recalculated = recalculated.__class__(str(recalculated))
864 self.assert_equal(received_packet_checksum,
865 getattr(recalculated[layer], field_name),
866 "packet checksum on layer: %s" % layer)
868 def assert_ip_checksum_valid(self, received_packet,
869 ignore_zero_checksum=False):
870 self.assert_checksum_valid(received_packet, 'IP',
871 ignore_zero_checksum=ignore_zero_checksum)
873 def assert_tcp_checksum_valid(self, received_packet,
874 ignore_zero_checksum=False):
875 self.assert_checksum_valid(received_packet, 'TCP',
876 ignore_zero_checksum=ignore_zero_checksum)
878 def assert_udp_checksum_valid(self, received_packet,
879 ignore_zero_checksum=True):
880 self.assert_checksum_valid(received_packet, 'UDP',
881 ignore_zero_checksum=ignore_zero_checksum)
883 def assert_embedded_icmp_checksum_valid(self, received_packet):
884 if received_packet.haslayer(IPerror):
885 self.assert_checksum_valid(received_packet, 'IPerror')
886 if received_packet.haslayer(TCPerror):
887 self.assert_checksum_valid(received_packet, 'TCPerror')
888 if received_packet.haslayer(UDPerror):
889 self.assert_checksum_valid(received_packet, 'UDPerror',
890 ignore_zero_checksum=True)
891 if received_packet.haslayer(ICMPerror):
892 self.assert_checksum_valid(received_packet, 'ICMPerror')
894 def assert_icmp_checksum_valid(self, received_packet):
895 self.assert_checksum_valid(received_packet, 'ICMP')
896 self.assert_embedded_icmp_checksum_valid(received_packet)
898 def assert_icmpv6_checksum_valid(self, pkt):
899 if pkt.haslayer(ICMPv6DestUnreach):
900 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
901 self.assert_embedded_icmp_checksum_valid(pkt)
902 if pkt.haslayer(ICMPv6EchoRequest):
903 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
904 if pkt.haslayer(ICMPv6EchoReply):
905 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
907 def assert_packet_counter_equal(self, counter, expected_value):
908 counters = self.vapi.cli("sh errors").split('\n')
910 for i in range(1, len(counters)-1):
911 results = counters[i].split()
912 if results[1] == counter:
913 counter_value = int(results[0])
915 self.assert_equal(counter_value, expected_value,
916 "packet counter `%s'" % counter)
919 def sleep(cls, timeout, remark=None):
920 if hasattr(cls, 'logger'):
921 cls.logger.debug("Starting sleep for %ss (%s)" % (timeout, remark))
925 if after - before > 2 * timeout:
926 cls.logger.error("unexpected time.sleep() result - "
927 "slept for %ss instead of ~%ss!" % (
928 after - before, timeout))
929 if hasattr(cls, 'logger'):
931 "Finished sleep (%s) - slept %ss (wanted %ss)" % (
932 remark, after - before, timeout))
934 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
935 self.vapi.cli("clear trace")
936 intf.add_stream(pkts)
937 self.pg_enable_capture(self.pg_interfaces)
941 for i in self.pg_interfaces:
942 i.get_capture(0, timeout=timeout)
943 i.assert_nothing_captured(remark=remark)
946 def send_and_expect(self, input, pkts, output):
947 self.vapi.cli("clear trace")
948 input.add_stream(pkts)
949 self.pg_enable_capture(self.pg_interfaces)
951 if isinstance(object, (list,)):
954 rx.append(output.get_capture(len(pkts)))
956 rx = output.get_capture(len(pkts))
959 def send_and_expect_only(self, input, pkts, output, timeout=None):
960 self.vapi.cli("clear trace")
961 input.add_stream(pkts)
962 self.pg_enable_capture(self.pg_interfaces)
964 if isinstance(object, (list,)):
968 rx.append(output.get_capture(len(pkts)))
970 rx = output.get_capture(len(pkts))
974 for i in self.pg_interfaces:
976 i.get_capture(0, timeout=timeout)
977 i.assert_nothing_captured()
983 def get_testcase_doc_name(test):
984 return getdoc(test.__class__).splitlines()[0]
987 def get_test_description(descriptions, test):
988 short_description = test.shortDescription()
989 if descriptions and short_description:
990 return short_description
995 class TestCaseInfo(object):
996 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
998 self.tempdir = tempdir
999 self.vpp_pid = vpp_pid
1000 self.vpp_bin_path = vpp_bin_path
1001 self.core_crash_test = None
1004 class VppTestResult(unittest.TestResult):
1006 @property result_string
1007 String variable to store the test case result string.
1009 List variable containing 2-tuples of TestCase instances and strings
1010 holding formatted tracebacks. Each tuple represents a test which
1011 raised an unexpected exception.
1013 List variable containing 2-tuples of TestCase instances and strings
1014 holding formatted tracebacks. Each tuple represents a test where
1015 a failure was explicitly signalled using the TestCase.assert*()
1019 failed_test_cases_info = set()
1020 core_crash_test_cases_info = set()
1021 current_test_case_info = None
1023 def __init__(self, stream, descriptions, verbosity, runner):
1025 :param stream File descriptor to store where to report test results.
1026 Set to the standard error stream by default.
1027 :param descriptions Boolean variable to store information if to use
1028 test case descriptions.
1029 :param verbosity Integer variable to store required verbosity level.
1031 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
1032 self.stream = stream
1033 self.descriptions = descriptions
1034 self.verbosity = verbosity
1035 self.result_string = None
1036 self.runner = runner
1038 def addSuccess(self, test):
1040 Record a test succeeded result
1045 if self.current_test_case_info:
1046 self.current_test_case_info.logger.debug(
1047 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1048 test._testMethodName,
1049 test._testMethodDoc))
1050 unittest.TestResult.addSuccess(self, test)
1051 self.result_string = colorize("OK", GREEN)
1053 self.send_result_through_pipe(test, PASS)
1055 def addSkip(self, test, reason):
1057 Record a test skipped.
1063 if self.current_test_case_info:
1064 self.current_test_case_info.logger.debug(
1065 "--- addSkip() %s.%s(%s) called, reason is %s" %
1066 (test.__class__.__name__, test._testMethodName,
1067 test._testMethodDoc, reason))
1068 unittest.TestResult.addSkip(self, test, reason)
1069 self.result_string = colorize("SKIP", YELLOW)
1071 self.send_result_through_pipe(test, SKIP)
1073 def symlink_failed(self):
1074 if self.current_test_case_info:
1076 failed_dir = os.getenv('FAILED_DIR')
1077 link_path = os.path.join(
1080 os.path.basename(self.current_test_case_info.tempdir))
1081 if self.current_test_case_info.logger:
1082 self.current_test_case_info.logger.debug(
1083 "creating a link to the failed test")
1084 self.current_test_case_info.logger.debug(
1085 "os.symlink(%s, %s)" %
1086 (self.current_test_case_info.tempdir, link_path))
1087 if os.path.exists(link_path):
1088 if self.current_test_case_info.logger:
1089 self.current_test_case_info.logger.debug(
1090 'symlink already exists')
1092 os.symlink(self.current_test_case_info.tempdir, link_path)
1094 except Exception as e:
1095 if self.current_test_case_info.logger:
1096 self.current_test_case_info.logger.error(e)
1098 def send_result_through_pipe(self, test, result):
1099 if hasattr(self, 'test_framework_result_pipe'):
1100 pipe = self.test_framework_result_pipe
1102 pipe.send((test.id(), result))
1104 def log_error(self, test, err, fn_name):
1105 if self.current_test_case_info:
1106 if isinstance(test, unittest.suite._ErrorHolder):
1107 test_name = test.description
1109 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1110 test._testMethodName,
1111 test._testMethodDoc)
1112 self.current_test_case_info.logger.debug(
1113 "--- %s() %s called, err is %s" %
1114 (fn_name, test_name, err))
1115 self.current_test_case_info.logger.debug(
1116 "formatted exception is:\n%s" %
1117 "".join(format_exception(*err)))
1119 def add_error(self, test, err, unittest_fn, error_type):
1120 if error_type == FAIL:
1121 self.log_error(test, err, 'addFailure')
1122 error_type_str = colorize("FAIL", RED)
1123 elif error_type == ERROR:
1124 self.log_error(test, err, 'addError')
1125 error_type_str = colorize("ERROR", RED)
1127 raise Exception('Error type %s cannot be used to record an '
1128 'error or a failure' % error_type)
1130 unittest_fn(self, test, err)
1131 if self.current_test_case_info:
1132 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1134 self.current_test_case_info.tempdir)
1135 self.symlink_failed()
1136 self.failed_test_cases_info.add(self.current_test_case_info)
1137 if is_core_present(self.current_test_case_info.tempdir):
1138 if not self.current_test_case_info.core_crash_test:
1139 if isinstance(test, unittest.suite._ErrorHolder):
1140 test_name = str(test)
1142 test_name = "'{}' ({})".format(
1143 get_testcase_doc_name(test), test.id())
1144 self.current_test_case_info.core_crash_test = test_name
1145 self.core_crash_test_cases_info.add(
1146 self.current_test_case_info)
1148 self.result_string = '%s [no temp dir]' % error_type_str
1150 self.send_result_through_pipe(test, error_type)
1152 def addFailure(self, test, err):
1154 Record a test failed result
1157 :param err: error message
1160 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1162 def addError(self, test, err):
1164 Record a test error result
1167 :param err: error message
1170 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1172 def getDescription(self, test):
1174 Get test description
1177 :returns: test description
1180 return get_test_description(self.descriptions, test)
1182 def startTest(self, test):
1191 unittest.TestResult.startTest(self, test)
1192 if self.verbosity > 0:
1193 self.stream.writeln(
1194 "Starting " + self.getDescription(test) + " ...")
1195 self.stream.writeln(single_line_delim)
1197 def stopTest(self, test):
1199 Called when the given test has been run
1204 unittest.TestResult.stopTest(self, test)
1205 if self.verbosity > 0:
1206 self.stream.writeln(single_line_delim)
1207 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1208 self.result_string))
1209 self.stream.writeln(single_line_delim)
1211 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1212 self.result_string))
1214 self.send_result_through_pipe(test, TEST_RUN)
1216 def printErrors(self):
1218 Print errors from running the test case
1220 if len(self.errors) > 0 or len(self.failures) > 0:
1221 self.stream.writeln()
1222 self.printErrorList('ERROR', self.errors)
1223 self.printErrorList('FAIL', self.failures)
1225 # ^^ that is the last output from unittest before summary
1226 if not self.runner.print_summary:
1227 devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
1228 self.stream = devnull
1229 self.runner.stream = devnull
1231 def printErrorList(self, flavour, errors):
1233 Print error list to the output stream together with error type
1234 and test case description.
1236 :param flavour: error type
1237 :param errors: iterable errors
1240 for test, err in errors:
1241 self.stream.writeln(double_line_delim)
1242 self.stream.writeln("%s: %s" %
1243 (flavour, self.getDescription(test)))
1244 self.stream.writeln(single_line_delim)
1245 self.stream.writeln("%s" % err)
1248 class VppTestRunner(unittest.TextTestRunner):
1250 A basic test runner implementation which prints results to standard error.
1253 def resultclass(self):
1254 """Class maintaining the results of the tests"""
1255 return VppTestResult
1257 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1258 result_pipe=None, failfast=False, buffer=False,
1259 resultclass=None, print_summary=True):
1261 # ignore stream setting here, use hard-coded stdout to be in sync
1262 # with prints from VppTestCase methods ...
1263 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1264 verbosity, failfast, buffer,
1266 KeepAliveReporter.pipe = keep_alive_pipe
1268 self.orig_stream = self.stream
1269 self.resultclass.test_framework_result_pipe = result_pipe
1271 self.print_summary = print_summary
1273 def _makeResult(self):
1274 return self.resultclass(self.stream,
1279 def run(self, test):
1286 faulthandler.enable() # emit stack trace to stderr if killed by signal
1288 result = super(VppTestRunner, self).run(test)
1289 if not self.print_summary:
1290 self.stream = self.orig_stream
1291 result.stream = self.orig_stream
1295 class Worker(Thread):
1296 def __init__(self, args, logger, env={}):
1297 self.logger = logger
1300 self.env = copy.deepcopy(env)
1301 super(Worker, self).__init__()
1304 executable = self.args[0]
1305 self.logger.debug("Running executable w/args `%s'" % self.args)
1306 env = os.environ.copy()
1307 env.update(self.env)
1308 env["CK_LOG_FILE_NAME"] = "-"
1309 self.process = subprocess.Popen(
1310 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1311 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1312 out, err = self.process.communicate()
1313 self.logger.debug("Finished running `%s'" % executable)
1314 self.logger.info("Return code is `%s'" % self.process.returncode)
1315 self.logger.info(single_line_delim)
1316 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1317 self.logger.info(single_line_delim)
1318 self.logger.info(out)
1319 self.logger.info(single_line_delim)
1320 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1321 self.logger.info(single_line_delim)
1322 self.logger.info(err)
1323 self.logger.info(single_line_delim)
1324 self.result = self.process.returncode