3 from __future__ import print_function
15 from collections import deque
16 from inspect import getdoc, isclass
17 from logging import FileHandler, DEBUG, Formatter
18 from threading import Thread, Event
19 from traceback import format_exception
23 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
24 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
25 from scapy.layers.inet6 import ICMPv6EchoReply
26 from scapy.packet import Raw
27 from vpp_papi.vpp_stats import VPPStats
29 from hook import StepHook, PollHook, VppDiedError
30 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
32 from util import ppp, is_core_present
33 from vpp_lo_interface import VppLoInterface
34 from vpp_object import VppObjectRegistry
35 from vpp_papi_provider import VppPapiProvider
36 from vpp_pg_interface import VppPGInterface
37 from vpp_sub_interface import VppSubInterface
39 if os.name == 'posix' and sys.version_info[0] < 3:
40 # using subprocess32 is recommended by python official documentation
41 # @ https://docs.python.org/2/library/subprocess.html
42 import subprocess32 as subprocess
54 debug_framework = False
55 if os.getenv('TEST_DEBUG', "0") == "1":
56 debug_framework = True
61 Test framework module.
63 The module provides a set of tools for constructing and running tests and
64 representing the results.
68 class _PacketInfo(object):
69 """Private class to create packet info object.
71 Help process information about the next packet.
72 Set variables to default values.
74 #: Store the index of the packet.
76 #: Store the index of the source packet generator interface of the packet.
78 #: Store the index of the destination packet generator interface
81 #: Store expected ip version
83 #: Store expected upper protocol
85 #: Store the copy of the former packet.
88 def __eq__(self, other):
89 index = self.index == other.index
90 src = self.src == other.src
91 dst = self.dst == other.dst
92 data = self.data == other.data
93 return index and src and dst and data
96 def pump_output(testclass):
97 """ pump output from vpp stdout/stderr to proper queues """
100 while not testclass.pump_thread_stop_flag.is_set():
101 readable = select.select([testclass.vpp.stdout.fileno(),
102 testclass.vpp.stderr.fileno(),
103 testclass.pump_thread_wakeup_pipe[0]],
105 if testclass.vpp.stdout.fileno() in readable:
106 read = os.read(testclass.vpp.stdout.fileno(), 102400)
108 split = read.splitlines(True)
109 if len(stdout_fragment) > 0:
110 split[0] = "%s%s" % (stdout_fragment, split[0])
111 if len(split) > 0 and split[-1].endswith("\n"):
115 stdout_fragment = split[-1]
116 testclass.vpp_stdout_deque.extend(split[:limit])
117 if not testclass.cache_vpp_output:
118 for line in split[:limit]:
119 testclass.logger.debug(
120 "VPP STDOUT: %s" % line.rstrip("\n"))
121 if testclass.vpp.stderr.fileno() in readable:
122 read = os.read(testclass.vpp.stderr.fileno(), 102400)
124 split = read.splitlines(True)
125 if len(stderr_fragment) > 0:
126 split[0] = "%s%s" % (stderr_fragment, split[0])
127 if len(split) > 0 and split[-1].endswith("\n"):
131 stderr_fragment = split[-1]
132 testclass.vpp_stderr_deque.extend(split[:limit])
133 if not testclass.cache_vpp_output:
134 for line in split[:limit]:
135 testclass.logger.debug(
136 "VPP STDERR: %s" % line.rstrip("\n"))
137 # ignoring the dummy pipe here intentionally - the flag will take care
138 # of properly terminating the loop
141 def running_extended_tests():
142 s = os.getenv("EXTENDED_TESTS", "n")
143 return True if s.lower() in ("y", "yes", "1") else False
146 def running_on_centos():
147 os_id = os.getenv("OS_ID", "")
148 return True if "centos" in os_id.lower() else False
151 class KeepAliveReporter(object):
153 Singleton object which reports test start to parent process
158 self.__dict__ = self._shared_state
166 def pipe(self, pipe):
167 if self._pipe is not None:
168 raise Exception("Internal error - pipe should only be set once.")
171 def send_keep_alive(self, test, desc=None):
173 Write current test tmpdir & desc to keep-alive pipe to signal liveness
175 if self.pipe is None:
176 # if not running forked..
180 desc = '%s (%s)' % (desc, unittest.util.strclass(test))
184 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
187 class VppTestCase(unittest.TestCase):
188 """This subclass is a base class for VPP test cases that are implemented as
189 classes. It provides methods to create and run test case.
193 def packet_infos(self):
194 """List of packet infos"""
195 return self._packet_infos
198 def get_packet_count_for_if_idx(cls, dst_if_index):
199 """Get the number of packet info for specified destination if index"""
200 if dst_if_index in cls._packet_count_for_dst_if_idx:
201 return cls._packet_count_for_dst_if_idx[dst_if_index]
207 """Return the instance of this testcase"""
208 return cls.test_instance
211 def set_debug_flags(cls, d):
212 cls.debug_core = False
213 cls.debug_gdb = False
214 cls.debug_gdbserver = False
219 cls.debug_core = True
222 elif dl == "gdbserver":
223 cls.debug_gdbserver = True
225 raise Exception("Unrecognized DEBUG option: '%s'" % d)
228 def get_least_used_cpu():
229 cpu_usage_list = [set(range(psutil.cpu_count()))]
230 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
231 if 'vpp_main' == p.info['name']]
232 for vpp_process in vpp_processes:
233 for cpu_usage_set in cpu_usage_list:
235 cpu_num = vpp_process.cpu_num()
236 if cpu_num in cpu_usage_set:
237 cpu_usage_set_index = cpu_usage_list.index(
239 if cpu_usage_set_index == len(cpu_usage_list) - 1:
240 cpu_usage_list.append({cpu_num})
242 cpu_usage_list[cpu_usage_set_index + 1].add(
244 cpu_usage_set.remove(cpu_num)
246 except psutil.NoSuchProcess:
249 for cpu_usage_set in cpu_usage_list:
250 if len(cpu_usage_set) > 0:
251 min_usage_set = cpu_usage_set
254 return random.choice(tuple(min_usage_set))
257 def print_header(cls):
258 if not hasattr(cls, '_header_printed'):
259 print(double_line_delim)
260 print(colorize(getdoc(cls).splitlines()[0], GREEN))
261 print(double_line_delim)
262 cls._header_printed = True
265 def setUpConstants(cls):
266 """ Set-up the test case class based on environment variables """
267 s = os.getenv("STEP", "n")
268 cls.step = True if s.lower() in ("y", "yes", "1") else False
269 d = os.getenv("DEBUG", None)
270 c = os.getenv("CACHE_OUTPUT", "1")
271 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
272 cls.set_debug_flags(d)
273 cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
274 cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
275 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
277 if cls.plugin_path is not None:
278 if cls.extern_plugin_path is not None:
279 plugin_path = "%s:%s" % (
280 cls.plugin_path, cls.extern_plugin_path)
282 plugin_path = cls.plugin_path
283 elif cls.extern_plugin_path is not None:
284 plugin_path = cls.extern_plugin_path
286 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
287 debug_cli = "cli-listen localhost:5002"
289 size = os.getenv("COREDUMP_SIZE")
291 coredump_size = "coredump-size %s" % size
292 if coredump_size is None:
293 coredump_size = "coredump-size unlimited"
295 cpu_core_number = cls.get_least_used_cpu()
297 cls.vpp_cmdline = [cls.vpp_bin, "unix",
298 "{", "nodaemon", debug_cli, "full-coredump",
299 coredump_size, "runtime-dir", cls.tempdir, "}",
300 "api-trace", "{", "on", "}", "api-segment", "{",
301 "prefix", cls.shm_prefix, "}", "cpu", "{",
302 "main-core", str(cpu_core_number), "}", "statseg",
303 "{", "socket-name", cls.stats_sock, "}", "plugins",
304 "{", "plugin", "dpdk_plugin.so", "{", "disable",
305 "}", "plugin", "unittest_plugin.so", "{", "enable",
307 if plugin_path is not None:
308 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
309 cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
310 cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
313 def wait_for_enter(cls):
314 if cls.debug_gdbserver:
315 print(double_line_delim)
316 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
318 print(double_line_delim)
319 print("Spawned VPP with PID: %d" % cls.vpp.pid)
321 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
323 print(single_line_delim)
324 print("You can debug the VPP using e.g.:")
325 if cls.debug_gdbserver:
326 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
327 print("Now is the time to attach a gdb by running the above "
328 "command, set up breakpoints etc. and then resume VPP from "
329 "within gdb by issuing the 'continue' command")
331 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
332 print("Now is the time to attach a gdb by running the above "
333 "command and set up breakpoints etc.")
334 print(single_line_delim)
335 six.input("Press ENTER to continue running the testcase...")
339 cmdline = cls.vpp_cmdline
341 if cls.debug_gdbserver:
342 gdbserver = '/usr/bin/gdbserver'
343 if not os.path.isfile(gdbserver) or \
344 not os.access(gdbserver, os.X_OK):
345 raise Exception("gdbserver binary '%s' does not exist or is "
346 "not executable" % gdbserver)
348 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
349 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
352 cls.vpp = subprocess.Popen(cmdline,
353 stdout=subprocess.PIPE,
354 stderr=subprocess.PIPE,
356 except subprocess.CalledProcessError as e:
357 cls.logger.critical("Couldn't start vpp: %s" % e)
363 def wait_for_stats_socket(cls):
364 deadline = time.time() + 3
366 while time.time() < deadline or \
367 cls.debug_gdb or cls.debug_gdbserver:
368 if os.path.exists(cls.stats_sock):
373 cls.logger.critical("Couldn't stat : {}".format(cls.stats_sock))
378 Perform class setup before running the testcase
379 Remove shared memory files, start vpp and connect the vpp-api
381 gc.collect() # run garbage collection first
384 cls.logger = get_logger(cls.__name__)
385 if hasattr(cls, 'parallel_handler'):
386 cls.logger.addHandler(cls.parallel_handler)
387 cls.logger.propagate = False
388 cls.tempdir = tempfile.mkdtemp(
389 prefix='vpp-unittest-%s-' % cls.__name__)
390 cls.stats_sock = "%s/stats.sock" % cls.tempdir
391 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
392 cls.file_handler.setFormatter(
393 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
395 cls.file_handler.setLevel(DEBUG)
396 cls.logger.addHandler(cls.file_handler)
397 cls.shm_prefix = os.path.basename(cls.tempdir)
398 os.chdir(cls.tempdir)
399 cls.logger.info("Temporary dir is %s, shm prefix is %s",
400 cls.tempdir, cls.shm_prefix)
402 cls.reset_packet_infos()
404 cls._zombie_captures = []
407 cls.registry = VppObjectRegistry()
408 cls.vpp_startup_failed = False
409 cls.reporter = KeepAliveReporter()
410 # need to catch exceptions here because if we raise, then the cleanup
411 # doesn't get called and we might end with a zombie vpp
414 cls.reporter.send_keep_alive(cls, 'setUpClass')
415 VppTestResult.current_test_case_info = TestCaseInfo(
416 cls.logger, cls.tempdir, cls.vpp.pid, cls.vpp_bin)
417 cls.vpp_stdout_deque = deque()
418 cls.vpp_stderr_deque = deque()
419 cls.pump_thread_stop_flag = Event()
420 cls.pump_thread_wakeup_pipe = os.pipe()
421 cls.pump_thread = Thread(target=pump_output, args=(cls,))
422 cls.pump_thread.daemon = True
423 cls.pump_thread.start()
424 if cls.debug_gdb or cls.debug_gdbserver:
428 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls,
434 cls.vapi.register_hook(hook)
435 cls.wait_for_stats_socket()
436 cls.statistics = VPPStats(socketname=cls.stats_sock)
440 cls.vpp_startup_failed = True
442 "VPP died shortly after startup, check the"
443 " output to standard error for possible cause")
449 cls.vapi.disconnect()
452 if cls.debug_gdbserver:
453 print(colorize("You're running VPP inside gdbserver but "
454 "VPP-API connection failed, did you forget "
455 "to 'continue' VPP from within gdb?", RED))
467 Disconnect vpp-api, kill vpp and cleanup shared memory files
469 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
471 if cls.vpp.returncode is None:
472 print(double_line_delim)
473 print("VPP or GDB server is still running")
474 print(single_line_delim)
475 six.input("When done debugging, press ENTER to kill the "
476 "process and finish running the testcase...")
478 # first signal that we want to stop the pump thread, then wake it up
479 if hasattr(cls, 'pump_thread_stop_flag'):
480 cls.pump_thread_stop_flag.set()
481 if hasattr(cls, 'pump_thread_wakeup_pipe'):
482 os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
483 if hasattr(cls, 'pump_thread'):
484 cls.logger.debug("Waiting for pump thread to stop")
485 cls.pump_thread.join()
486 if hasattr(cls, 'vpp_stderr_reader_thread'):
487 cls.logger.debug("Waiting for stdderr pump to stop")
488 cls.vpp_stderr_reader_thread.join()
490 if hasattr(cls, 'vpp'):
491 if hasattr(cls, 'vapi'):
492 cls.vapi.disconnect()
495 if cls.vpp.returncode is None:
496 cls.logger.debug("Sending TERM to vpp")
498 cls.logger.debug("Waiting for vpp to die")
499 cls.vpp.communicate()
502 if cls.vpp_startup_failed:
503 stdout_log = cls.logger.info
504 stderr_log = cls.logger.critical
506 stdout_log = cls.logger.info
507 stderr_log = cls.logger.info
509 if hasattr(cls, 'vpp_stdout_deque'):
510 stdout_log(single_line_delim)
511 stdout_log('VPP output to stdout while running %s:', cls.__name__)
512 stdout_log(single_line_delim)
513 vpp_output = "".join(cls.vpp_stdout_deque)
514 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
516 stdout_log('\n%s', vpp_output)
517 stdout_log(single_line_delim)
519 if hasattr(cls, 'vpp_stderr_deque'):
520 stderr_log(single_line_delim)
521 stderr_log('VPP output to stderr while running %s:', cls.__name__)
522 stderr_log(single_line_delim)
523 vpp_output = "".join(cls.vpp_stderr_deque)
524 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
526 stderr_log('\n%s', vpp_output)
527 stderr_log(single_line_delim)
530 def tearDownClass(cls):
531 """ Perform final cleanup after running all tests in this test-case """
532 cls.reporter.send_keep_alive(cls, 'tearDownClass')
534 cls.file_handler.close()
535 cls.reset_packet_infos()
537 debug_internal.on_tear_down_class(cls)
540 """ Show various debug prints after each test """
541 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
542 (self.__class__.__name__, self._testMethodName,
543 self._testMethodDoc))
544 if not self.vpp_dead:
545 self.logger.debug(self.vapi.cli("show trace"))
546 self.logger.info(self.vapi.ppcli("show interface"))
547 self.logger.info(self.vapi.ppcli("show hardware"))
548 self.logger.info(self.statistics.set_errors_str())
549 self.logger.info(self.vapi.ppcli("show run"))
550 self.logger.info(self.vapi.ppcli("show log"))
551 self.registry.remove_vpp_config(self.logger)
552 # Save/Dump VPP api trace log
553 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
554 tmp_api_trace = "/tmp/%s" % api_trace
555 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
556 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
557 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
559 os.rename(tmp_api_trace, vpp_api_trace_log)
560 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
563 self.registry.unregister_all(self.logger)
566 """ Clear trace before running each test"""
567 self.reporter.send_keep_alive(self)
568 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
569 (self.__class__.__name__, self._testMethodName,
570 self._testMethodDoc))
572 raise Exception("VPP is dead when setting up the test")
573 self.sleep(.1, "during setUp")
574 self.vpp_stdout_deque.append(
575 "--- test setUp() for %s.%s(%s) starts here ---\n" %
576 (self.__class__.__name__, self._testMethodName,
577 self._testMethodDoc))
578 self.vpp_stderr_deque.append(
579 "--- test setUp() for %s.%s(%s) starts here ---\n" %
580 (self.__class__.__name__, self._testMethodName,
581 self._testMethodDoc))
582 self.vapi.cli("clear trace")
583 # store the test instance inside the test class - so that objects
584 # holding the class can access instance methods (like assertEqual)
585 type(self).test_instance = self
588 def pg_enable_capture(cls, interfaces=None):
590 Enable capture on packet-generator interfaces
592 :param interfaces: iterable interface indexes (if None,
593 use self.pg_interfaces)
596 if interfaces is None:
597 interfaces = cls.pg_interfaces
602 def register_capture(cls, cap_name):
603 """ Register a capture in the testclass """
604 # add to the list of captures with current timestamp
605 cls._captures.append((time.time(), cap_name))
606 # filter out from zombies
607 cls._zombie_captures = [(stamp, name)
608 for (stamp, name) in cls._zombie_captures
613 """ Remove any zombie captures and enable the packet generator """
614 # how long before capture is allowed to be deleted - otherwise vpp
615 # crashes - 100ms seems enough (this shouldn't be needed at all)
618 for stamp, cap_name in cls._zombie_captures:
619 wait = stamp + capture_ttl - now
621 cls.sleep(wait, "before deleting capture %s" % cap_name)
623 cls.logger.debug("Removing zombie capture %s" % cap_name)
624 cls.vapi.cli('packet-generator delete %s' % cap_name)
626 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
627 cls.vapi.cli('packet-generator enable')
628 cls._zombie_captures = cls._captures
632 def create_pg_interfaces(cls, interfaces):
634 Create packet-generator interfaces.
636 :param interfaces: iterable indexes of the interfaces.
637 :returns: List of created interfaces.
642 intf = VppPGInterface(cls, i)
643 setattr(cls, intf.name, intf)
645 cls.pg_interfaces = result
649 def create_loopback_interfaces(cls, count):
651 Create loopback interfaces.
653 :param count: number of interfaces created.
654 :returns: List of created interfaces.
656 result = [VppLoInterface(cls) for i in range(count)]
658 setattr(cls, intf.name, intf)
659 cls.lo_interfaces = result
663 def extend_packet(packet, size, padding=' '):
665 Extend packet to given size by padding with spaces or custom padding
666 NOTE: Currently works only when Raw layer is present.
668 :param packet: packet
669 :param size: target size
670 :param padding: padding used to extend the payload
673 packet_len = len(packet) + 4
674 extend = size - packet_len
676 num = (extend / len(padding)) + 1
677 packet[Raw].load += (padding * num)[:extend]
680 def reset_packet_infos(cls):
681 """ Reset the list of packet info objects and packet counts to zero """
682 cls._packet_infos = {}
683 cls._packet_count_for_dst_if_idx = {}
686 def create_packet_info(cls, src_if, dst_if):
688 Create packet info object containing the source and destination indexes
689 and add it to the testcase's packet info list
691 :param VppInterface src_if: source interface
692 :param VppInterface dst_if: destination interface
694 :returns: _PacketInfo object
698 info.index = len(cls._packet_infos)
699 info.src = src_if.sw_if_index
700 info.dst = dst_if.sw_if_index
701 if isinstance(dst_if, VppSubInterface):
702 dst_idx = dst_if.parent.sw_if_index
704 dst_idx = dst_if.sw_if_index
705 if dst_idx in cls._packet_count_for_dst_if_idx:
706 cls._packet_count_for_dst_if_idx[dst_idx] += 1
708 cls._packet_count_for_dst_if_idx[dst_idx] = 1
709 cls._packet_infos[info.index] = info
713 def info_to_payload(info):
715 Convert _PacketInfo object to packet payload
717 :param info: _PacketInfo object
719 :returns: string containing serialized data from packet info
721 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
725 def payload_to_info(payload):
727 Convert packet payload to _PacketInfo object
729 :param payload: packet payload
731 :returns: _PacketInfo object containing de-serialized data from payload
734 numbers = payload.split()
736 info.index = int(numbers[0])
737 info.src = int(numbers[1])
738 info.dst = int(numbers[2])
739 info.ip = int(numbers[3])
740 info.proto = int(numbers[4])
743 def get_next_packet_info(self, info):
745 Iterate over the packet info list stored in the testcase
746 Start iteration with first element if info is None
747 Continue based on index in info if info is specified
749 :param info: info or None
750 :returns: next info in list or None if no more infos
755 next_index = info.index + 1
756 if next_index == len(self._packet_infos):
759 return self._packet_infos[next_index]
761 def get_next_packet_info_for_interface(self, src_index, info):
763 Search the packet info list for the next packet info with same source
766 :param src_index: source interface index to search for
767 :param info: packet info - where to start the search
768 :returns: packet info or None
772 info = self.get_next_packet_info(info)
775 if info.src == src_index:
778 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
780 Search the packet info list for the next packet info with same source
781 and destination interface indexes
783 :param src_index: source interface index to search for
784 :param dst_index: destination interface index to search for
785 :param info: packet info - where to start the search
786 :returns: packet info or None
790 info = self.get_next_packet_info_for_interface(src_index, info)
793 if info.dst == dst_index:
796 def assert_equal(self, real_value, expected_value, name_or_class=None):
797 if name_or_class is None:
798 self.assertEqual(real_value, expected_value)
801 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
802 msg = msg % (getdoc(name_or_class).strip(),
803 real_value, str(name_or_class(real_value)),
804 expected_value, str(name_or_class(expected_value)))
806 msg = "Invalid %s: %s does not match expected value %s" % (
807 name_or_class, real_value, expected_value)
809 self.assertEqual(real_value, expected_value, msg)
811 def assert_in_range(self,
819 msg = "Invalid %s: %s out of range <%s,%s>" % (
820 name, real_value, expected_min, expected_max)
821 self.assertTrue(expected_min <= real_value <= expected_max, msg)
823 def assert_packet_checksums_valid(self, packet,
824 ignore_zero_udp_checksums=True):
825 received = packet.__class__(str(packet))
827 ppp("Verifying packet checksums for packet:", received))
828 udp_layers = ['UDP', 'UDPerror']
829 checksum_fields = ['cksum', 'chksum']
832 temp = received.__class__(str(received))
834 layer = temp.getlayer(counter)
836 for cf in checksum_fields:
837 if hasattr(layer, cf):
838 if ignore_zero_udp_checksums and \
839 0 == getattr(layer, cf) and \
840 layer.name in udp_layers:
843 checksums.append((counter, cf))
846 counter = counter + 1
847 if 0 == len(checksums):
849 temp = temp.__class__(str(temp))
850 for layer, cf in checksums:
851 calc_sum = getattr(temp[layer], cf)
853 getattr(received[layer], cf), calc_sum,
854 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
856 "Checksum field `%s` on `%s` layer has correct value `%s`" %
857 (cf, temp[layer].name, calc_sum))
859 def assert_checksum_valid(self, received_packet, layer,
861 ignore_zero_checksum=False):
862 """ Check checksum of received packet on given layer """
863 received_packet_checksum = getattr(received_packet[layer], field_name)
864 if ignore_zero_checksum and 0 == received_packet_checksum:
866 recalculated = received_packet.__class__(str(received_packet))
867 delattr(recalculated[layer], field_name)
868 recalculated = recalculated.__class__(str(recalculated))
869 self.assert_equal(received_packet_checksum,
870 getattr(recalculated[layer], field_name),
871 "packet checksum on layer: %s" % layer)
873 def assert_ip_checksum_valid(self, received_packet,
874 ignore_zero_checksum=False):
875 self.assert_checksum_valid(received_packet, 'IP',
876 ignore_zero_checksum=ignore_zero_checksum)
878 def assert_tcp_checksum_valid(self, received_packet,
879 ignore_zero_checksum=False):
880 self.assert_checksum_valid(received_packet, 'TCP',
881 ignore_zero_checksum=ignore_zero_checksum)
883 def assert_udp_checksum_valid(self, received_packet,
884 ignore_zero_checksum=True):
885 self.assert_checksum_valid(received_packet, 'UDP',
886 ignore_zero_checksum=ignore_zero_checksum)
888 def assert_embedded_icmp_checksum_valid(self, received_packet):
889 if received_packet.haslayer(IPerror):
890 self.assert_checksum_valid(received_packet, 'IPerror')
891 if received_packet.haslayer(TCPerror):
892 self.assert_checksum_valid(received_packet, 'TCPerror')
893 if received_packet.haslayer(UDPerror):
894 self.assert_checksum_valid(received_packet, 'UDPerror',
895 ignore_zero_checksum=True)
896 if received_packet.haslayer(ICMPerror):
897 self.assert_checksum_valid(received_packet, 'ICMPerror')
899 def assert_icmp_checksum_valid(self, received_packet):
900 self.assert_checksum_valid(received_packet, 'ICMP')
901 self.assert_embedded_icmp_checksum_valid(received_packet)
903 def assert_icmpv6_checksum_valid(self, pkt):
904 if pkt.haslayer(ICMPv6DestUnreach):
905 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
906 self.assert_embedded_icmp_checksum_valid(pkt)
907 if pkt.haslayer(ICMPv6EchoRequest):
908 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
909 if pkt.haslayer(ICMPv6EchoReply):
910 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
912 def assert_packet_counter_equal(self, counter, expected_value):
913 counters = self.vapi.cli("sh errors").split('\n')
915 for i in range(1, len(counters)-1):
916 results = counters[i].split()
917 if results[1] == counter:
918 counter_value = int(results[0])
920 self.assert_equal(counter_value, expected_value,
921 "packet counter `%s'" % counter)
924 def sleep(cls, timeout, remark=None):
925 if hasattr(cls, 'logger'):
926 cls.logger.debug("Starting sleep for %ss (%s)" % (timeout, remark))
930 if after - before > 2 * timeout:
931 cls.logger.error("unexpected time.sleep() result - "
932 "slept for %ss instead of ~%ss!" % (
933 after - before, timeout))
934 if hasattr(cls, 'logger'):
936 "Finished sleep (%s) - slept %ss (wanted %ss)" % (
937 remark, after - before, timeout))
939 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
940 self.vapi.cli("clear trace")
941 intf.add_stream(pkts)
942 self.pg_enable_capture(self.pg_interfaces)
946 for i in self.pg_interfaces:
947 i.get_capture(0, timeout=timeout)
948 i.assert_nothing_captured(remark=remark)
951 def send_and_expect(self, input, pkts, output):
952 self.vapi.cli("clear trace")
953 input.add_stream(pkts)
954 self.pg_enable_capture(self.pg_interfaces)
956 if isinstance(object, (list,)):
959 rx.append(output.get_capture(len(pkts)))
961 rx = output.get_capture(len(pkts))
964 def send_and_expect_only(self, input, pkts, output, timeout=None):
965 self.vapi.cli("clear trace")
966 input.add_stream(pkts)
967 self.pg_enable_capture(self.pg_interfaces)
969 if isinstance(object, (list,)):
973 rx.append(output.get_capture(len(pkts)))
975 rx = output.get_capture(len(pkts))
979 for i in self.pg_interfaces:
981 i.get_capture(0, timeout=timeout)
982 i.assert_nothing_captured()
988 def get_testcase_doc_name(test):
989 return getdoc(test.__class__).splitlines()[0]
992 def get_test_description(descriptions, test):
993 short_description = test.shortDescription()
994 if descriptions and short_description:
995 return short_description
1000 class TestCaseInfo(object):
1001 def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
1002 self.logger = logger
1003 self.tempdir = tempdir
1004 self.vpp_pid = vpp_pid
1005 self.vpp_bin_path = vpp_bin_path
1006 self.core_crash_test = None
1009 class VppTestResult(unittest.TestResult):
1011 @property result_string
1012 String variable to store the test case result string.
1014 List variable containing 2-tuples of TestCase instances and strings
1015 holding formatted tracebacks. Each tuple represents a test which
1016 raised an unexpected exception.
1018 List variable containing 2-tuples of TestCase instances and strings
1019 holding formatted tracebacks. Each tuple represents a test where
1020 a failure was explicitly signalled using the TestCase.assert*()
1024 failed_test_cases_info = set()
1025 core_crash_test_cases_info = set()
1026 current_test_case_info = None
1028 def __init__(self, stream, descriptions, verbosity):
1030 :param stream File descriptor to store where to report test results.
1031 Set to the standard error stream by default.
1032 :param descriptions Boolean variable to store information if to use
1033 test case descriptions.
1034 :param verbosity Integer variable to store required verbosity level.
1036 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
1037 self.stream = stream
1038 self.descriptions = descriptions
1039 self.verbosity = verbosity
1040 self.result_string = None
1042 def addSuccess(self, test):
1044 Record a test succeeded result
1049 if self.current_test_case_info:
1050 self.current_test_case_info.logger.debug(
1051 "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
1052 test._testMethodName,
1053 test._testMethodDoc))
1054 unittest.TestResult.addSuccess(self, test)
1055 self.result_string = colorize("OK", GREEN)
1057 self.send_result_through_pipe(test, PASS)
1059 def addSkip(self, test, reason):
1061 Record a test skipped.
1067 if self.current_test_case_info:
1068 self.current_test_case_info.logger.debug(
1069 "--- addSkip() %s.%s(%s) called, reason is %s" %
1070 (test.__class__.__name__, test._testMethodName,
1071 test._testMethodDoc, reason))
1072 unittest.TestResult.addSkip(self, test, reason)
1073 self.result_string = colorize("SKIP", YELLOW)
1075 self.send_result_through_pipe(test, SKIP)
1077 def symlink_failed(self):
1078 if self.current_test_case_info:
1080 failed_dir = os.getenv('VPP_TEST_FAILED_DIR')
1081 link_path = os.path.join(
1084 os.path.basename(self.current_test_case_info.tempdir))
1085 if self.current_test_case_info.logger:
1086 self.current_test_case_info.logger.debug(
1087 "creating a link to the failed test")
1088 self.current_test_case_info.logger.debug(
1089 "os.symlink(%s, %s)" %
1090 (self.current_test_case_info.tempdir, link_path))
1091 if os.path.exists(link_path):
1092 if self.current_test_case_info.logger:
1093 self.current_test_case_info.logger.debug(
1094 'symlink already exists')
1096 os.symlink(self.current_test_case_info.tempdir, link_path)
1098 except Exception as e:
1099 if self.current_test_case_info.logger:
1100 self.current_test_case_info.logger.error(e)
1102 def send_result_through_pipe(self, test, result):
1103 if hasattr(self, 'test_framework_result_pipe'):
1104 pipe = self.test_framework_result_pipe
1106 pipe.send((test.id(), result))
1108 def log_error(self, test, err, fn_name):
1109 if self.current_test_case_info:
1110 if isinstance(test, unittest.suite._ErrorHolder):
1111 test_name = test.description
1113 test_name = '%s.%s(%s)' % (test.__class__.__name__,
1114 test._testMethodName,
1115 test._testMethodDoc)
1116 self.current_test_case_info.logger.debug(
1117 "--- %s() %s called, err is %s" %
1118 (fn_name, test_name, err))
1119 self.current_test_case_info.logger.debug(
1120 "formatted exception is:\n%s" %
1121 "".join(format_exception(*err)))
1123 def add_error(self, test, err, unittest_fn, error_type):
1124 if error_type == FAIL:
1125 self.log_error(test, err, 'addFailure')
1126 error_type_str = colorize("FAIL", RED)
1127 elif error_type == ERROR:
1128 self.log_error(test, err, 'addError')
1129 error_type_str = colorize("ERROR", RED)
1131 raise Exception('Error type %s cannot be used to record an '
1132 'error or a failure' % error_type)
1134 unittest_fn(self, test, err)
1135 if self.current_test_case_info:
1136 self.result_string = "%s [ temp dir used by test case: %s ]" % \
1138 self.current_test_case_info.tempdir)
1139 self.symlink_failed()
1140 self.failed_test_cases_info.add(self.current_test_case_info)
1141 if is_core_present(self.current_test_case_info.tempdir):
1142 if not self.current_test_case_info.core_crash_test:
1143 if isinstance(test, unittest.suite._ErrorHolder):
1144 test_name = str(test)
1146 test_name = "'{}' ({})".format(
1147 get_testcase_doc_name(test), test.id())
1148 self.current_test_case_info.core_crash_test = test_name
1149 self.core_crash_test_cases_info.add(
1150 self.current_test_case_info)
1152 self.result_string = '%s [no temp dir]' % error_type_str
1154 self.send_result_through_pipe(test, error_type)
1156 def addFailure(self, test, err):
1158 Record a test failed result
1161 :param err: error message
1164 self.add_error(test, err, unittest.TestResult.addFailure, FAIL)
1166 def addError(self, test, err):
1168 Record a test error result
1171 :param err: error message
1174 self.add_error(test, err, unittest.TestResult.addError, ERROR)
1176 def getDescription(self, test):
1178 Get test description
1181 :returns: test description
1184 return get_test_description(self.descriptions, test)
1186 def startTest(self, test):
1195 unittest.TestResult.startTest(self, test)
1196 if self.verbosity > 0:
1197 self.stream.writeln(
1198 "Starting " + self.getDescription(test) + " ...")
1199 self.stream.writeln(single_line_delim)
1201 def stopTest(self, test):
1203 Called when the given test has been run
1208 unittest.TestResult.stopTest(self, test)
1209 if self.verbosity > 0:
1210 self.stream.writeln(single_line_delim)
1211 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1212 self.result_string))
1213 self.stream.writeln(single_line_delim)
1215 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1216 self.result_string))
1218 self.send_result_through_pipe(test, TEST_RUN)
1220 def printErrors(self):
1222 Print errors from running the test case
1224 self.stream.writeln()
1225 self.printErrorList('ERROR', self.errors)
1226 self.printErrorList('FAIL', self.failures)
1228 def printErrorList(self, flavour, errors):
1230 Print error list to the output stream together with error type
1231 and test case description.
1233 :param flavour: error type
1234 :param errors: iterable errors
1237 for test, err in errors:
1238 self.stream.writeln(double_line_delim)
1239 self.stream.writeln("%s: %s" %
1240 (flavour, self.getDescription(test)))
1241 self.stream.writeln(single_line_delim)
1242 self.stream.writeln("%s" % err)
1245 class VppTestRunner(unittest.TextTestRunner):
1247 A basic test runner implementation which prints results to standard error.
1250 def resultclass(self):
1251 """Class maintaining the results of the tests"""
1252 return VppTestResult
1254 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1255 result_pipe=None, failfast=False, buffer=False,
1258 # ignore stream setting here, use hard-coded stdout to be in sync
1259 # with prints from VppTestCase methods ...
1260 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1261 verbosity, failfast, buffer,
1263 KeepAliveReporter.pipe = keep_alive_pipe
1265 VppTestResult.test_framework_result_pipe = result_pipe
1267 def run(self, test):
1274 faulthandler.enable() # emit stack trace to stderr if killed by signal
1276 result = super(VppTestRunner, self).run(test)
1280 class Worker(Thread):
1281 def __init__(self, args, logger, env={}):
1282 self.logger = logger
1285 self.env = copy.deepcopy(env)
1286 super(Worker, self).__init__()
1289 executable = self.args[0]
1290 self.logger.debug("Running executable w/args `%s'" % self.args)
1291 env = os.environ.copy()
1292 env.update(self.env)
1293 env["CK_LOG_FILE_NAME"] = "-"
1294 self.process = subprocess.Popen(
1295 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1296 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1297 out, err = self.process.communicate()
1298 self.logger.debug("Finished running `%s'" % executable)
1299 self.logger.info("Return code is `%s'" % self.process.returncode)
1300 self.logger.info(single_line_delim)
1301 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1302 self.logger.info(single_line_delim)
1303 self.logger.info(out)
1304 self.logger.info(single_line_delim)
1305 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1306 self.logger.info(single_line_delim)
1307 self.logger.info(err)
1308 self.logger.info(single_line_delim)
1309 self.result = self.process.returncode