3 from __future__ import print_function
15 from collections import deque
16 from threading import Thread, Event
17 from inspect import getdoc, isclass
18 from traceback import format_exception
19 from logging import FileHandler, DEBUG, Formatter
20 from scapy.packet import Raw
21 from hook import StepHook, PollHook, VppDiedError
22 from vpp_pg_interface import VppPGInterface
23 from vpp_sub_interface import VppSubInterface
24 from vpp_lo_interface import VppLoInterface
25 from vpp_papi_provider import VppPapiProvider
26 from vpp_papi.vpp_stats import VPPStats
27 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
29 from vpp_object import VppObjectRegistry
31 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
32 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
33 from scapy.layers.inet6 import ICMPv6EchoReply
34 if os.name == 'posix' and sys.version_info[0] < 3:
35 # using subprocess32 is recommended by python official documentation
36 # @ https://docs.python.org/2/library/subprocess.html
37 import subprocess32 as subprocess
42 debug_framework = False
43 if os.getenv('TEST_DEBUG', "0") == "1":
44 debug_framework = True
49 Test framework module.
51 The module provides a set of tools for constructing and running tests and
52 representing the results.
56 class _PacketInfo(object):
57 """Private class to create packet info object.
59 Help process information about the next packet.
60 Set variables to default values.
62 #: Store the index of the packet.
64 #: Store the index of the source packet generator interface of the packet.
66 #: Store the index of the destination packet generator interface
69 #: Store expected ip version
71 #: Store expected upper protocol
73 #: Store the copy of the former packet.
76 def __eq__(self, other):
77 index = self.index == other.index
78 src = self.src == other.src
79 dst = self.dst == other.dst
80 data = self.data == other.data
81 return index and src and dst and data
84 def pump_output(testclass):
85 """ pump output from vpp stdout/stderr to proper queues """
88 while not testclass.pump_thread_stop_flag.is_set():
89 readable = select.select([testclass.vpp.stdout.fileno(),
90 testclass.vpp.stderr.fileno(),
91 testclass.pump_thread_wakeup_pipe[0]],
93 if testclass.vpp.stdout.fileno() in readable:
94 read = os.read(testclass.vpp.stdout.fileno(), 102400)
96 split = read.splitlines(True)
97 if len(stdout_fragment) > 0:
98 split[0] = "%s%s" % (stdout_fragment, split[0])
99 if len(split) > 0 and split[-1].endswith("\n"):
103 stdout_fragment = split[-1]
104 testclass.vpp_stdout_deque.extend(split[:limit])
105 if not testclass.cache_vpp_output:
106 for line in split[:limit]:
107 testclass.logger.debug(
108 "VPP STDOUT: %s" % line.rstrip("\n"))
109 if testclass.vpp.stderr.fileno() in readable:
110 read = os.read(testclass.vpp.stderr.fileno(), 102400)
112 split = read.splitlines(True)
113 if len(stderr_fragment) > 0:
114 split[0] = "%s%s" % (stderr_fragment, split[0])
115 if len(split) > 0 and split[-1].endswith("\n"):
119 stderr_fragment = split[-1]
120 testclass.vpp_stderr_deque.extend(split[:limit])
121 if not testclass.cache_vpp_output:
122 for line in split[:limit]:
123 testclass.logger.debug(
124 "VPP STDERR: %s" % line.rstrip("\n"))
125 # ignoring the dummy pipe here intentionally - the flag will take care
126 # of properly terminating the loop
129 def running_extended_tests():
130 s = os.getenv("EXTENDED_TESTS", "n")
131 return True if s.lower() in ("y", "yes", "1") else False
134 def running_on_centos():
135 os_id = os.getenv("OS_ID", "")
136 return True if "centos" in os_id.lower() else False
139 class KeepAliveReporter(object):
141 Singleton object which reports test start to parent process
146 self.__dict__ = self._shared_state
153 def pipe(self, pipe):
154 if hasattr(self, '_pipe'):
155 raise Exception("Internal error - pipe should only be set once.")
158 def send_keep_alive(self, test):
160 Write current test tmpdir & desc to keep-alive pipe to signal liveness
162 if self.pipe is None:
163 # if not running forked..
169 desc = test.shortDescription()
173 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
176 class VppTestCase(unittest.TestCase):
177 """This subclass is a base class for VPP test cases that are implemented as
178 classes. It provides methods to create and run test case.
182 def packet_infos(self):
183 """List of packet infos"""
184 return self._packet_infos
187 def get_packet_count_for_if_idx(cls, dst_if_index):
188 """Get the number of packet info for specified destination if index"""
189 if dst_if_index in cls._packet_count_for_dst_if_idx:
190 return cls._packet_count_for_dst_if_idx[dst_if_index]
196 """Return the instance of this testcase"""
197 return cls.test_instance
200 def set_debug_flags(cls, d):
201 cls.debug_core = False
202 cls.debug_gdb = False
203 cls.debug_gdbserver = False
208 cls.debug_core = True
211 elif dl == "gdbserver":
212 cls.debug_gdbserver = True
214 raise Exception("Unrecognized DEBUG option: '%s'" % d)
217 def get_least_used_cpu(self):
218 cpu_usage_list = [set(range(psutil.cpu_count()))]
219 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
220 if 'vpp_main' == p.info['name']]
221 for vpp_process in vpp_processes:
222 for cpu_usage_set in cpu_usage_list:
224 cpu_num = vpp_process.cpu_num()
225 if cpu_num in cpu_usage_set:
226 cpu_usage_set_index = cpu_usage_list.index(
228 if cpu_usage_set_index == len(cpu_usage_list) - 1:
229 cpu_usage_list.append({cpu_num})
231 cpu_usage_list[cpu_usage_set_index + 1].add(
233 cpu_usage_set.remove(cpu_num)
235 except psutil.NoSuchProcess:
238 for cpu_usage_set in cpu_usage_list:
239 if len(cpu_usage_set) > 0:
240 min_usage_set = cpu_usage_set
243 return random.choice(tuple(min_usage_set))
246 def setUpConstants(cls):
247 """ Set-up the test case class based on environment variables """
248 s = os.getenv("STEP", "n")
249 cls.step = True if s.lower() in ("y", "yes", "1") else False
250 d = os.getenv("DEBUG", None)
251 c = os.getenv("CACHE_OUTPUT", "1")
252 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
253 cls.set_debug_flags(d)
254 cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
255 cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
256 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
258 if cls.plugin_path is not None:
259 if cls.extern_plugin_path is not None:
260 plugin_path = "%s:%s" % (
261 cls.plugin_path, cls.extern_plugin_path)
263 plugin_path = cls.plugin_path
264 elif cls.extern_plugin_path is not None:
265 plugin_path = cls.extern_plugin_path
267 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
268 debug_cli = "cli-listen localhost:5002"
270 size = os.getenv("COREDUMP_SIZE")
272 coredump_size = "coredump-size %s" % size
273 if coredump_size is None:
274 coredump_size = "coredump-size unlimited"
276 cpu_core_number = cls.get_least_used_cpu()
278 cls.vpp_cmdline = [cls.vpp_bin, "unix",
279 "{", "nodaemon", debug_cli, "full-coredump",
280 coredump_size, "}", "api-trace", "{", "on", "}",
281 "api-segment", "{", "prefix", cls.shm_prefix, "}",
282 "cpu", "{", "main-core", str(cpu_core_number), "}",
283 "stats", "{", "socket-name",
284 cls.tempdir + "/stats.sock", "}",
285 "plugins", "{", "plugin", "dpdk_plugin.so", "{",
286 "disable", "}", "plugin", "unittest_plugin.so",
287 "{", "enable", "}", "}", ]
288 if plugin_path is not None:
289 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
290 cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
293 def wait_for_enter(cls):
294 if cls.debug_gdbserver:
295 print(double_line_delim)
296 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
298 print(double_line_delim)
299 print("Spawned VPP with PID: %d" % cls.vpp.pid)
301 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
303 print(single_line_delim)
304 print("You can debug the VPP using e.g.:")
305 if cls.debug_gdbserver:
306 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
307 print("Now is the time to attach a gdb by running the above "
308 "command, set up breakpoints etc. and then resume VPP from "
309 "within gdb by issuing the 'continue' command")
311 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
312 print("Now is the time to attach a gdb by running the above "
313 "command and set up breakpoints etc.")
314 print(single_line_delim)
315 raw_input("Press ENTER to continue running the testcase...")
319 cmdline = cls.vpp_cmdline
321 if cls.debug_gdbserver:
322 gdbserver = '/usr/bin/gdbserver'
323 if not os.path.isfile(gdbserver) or \
324 not os.access(gdbserver, os.X_OK):
325 raise Exception("gdbserver binary '%s' does not exist or is "
326 "not executable" % gdbserver)
328 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
329 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
332 cls.vpp = subprocess.Popen(cmdline,
333 stdout=subprocess.PIPE,
334 stderr=subprocess.PIPE,
336 except Exception as e:
337 cls.logger.critical("Couldn't start vpp: %s" % e)
345 Perform class setup before running the testcase
346 Remove shared memory files, start vpp and connect the vpp-api
348 gc.collect() # run garbage collection first
350 if not hasattr(cls, 'logger'):
351 cls.logger = getLogger(cls.__name__)
353 cls.logger.name = cls.__name__
354 cls.tempdir = tempfile.mkdtemp(
355 prefix='vpp-unittest-%s-' % cls.__name__)
356 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
357 cls.file_handler.setFormatter(
358 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
360 cls.file_handler.setLevel(DEBUG)
361 cls.logger.addHandler(cls.file_handler)
362 cls.shm_prefix = os.path.basename(cls.tempdir)
363 os.chdir(cls.tempdir)
364 cls.logger.info("Temporary dir is %s, shm prefix is %s",
365 cls.tempdir, cls.shm_prefix)
367 cls.reset_packet_infos()
369 cls._zombie_captures = []
372 cls.registry = VppObjectRegistry()
373 cls.vpp_startup_failed = False
374 cls.reporter = KeepAliveReporter()
375 # need to catch exceptions here because if we raise, then the cleanup
376 # doesn't get called and we might end with a zombie vpp
379 cls.reporter.send_keep_alive(cls)
380 cls.vpp_stdout_deque = deque()
381 cls.vpp_stderr_deque = deque()
382 cls.pump_thread_stop_flag = Event()
383 cls.pump_thread_wakeup_pipe = os.pipe()
384 cls.pump_thread = Thread(target=pump_output, args=(cls,))
385 cls.pump_thread.daemon = True
386 cls.pump_thread.start()
387 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls)
392 cls.vapi.register_hook(hook)
393 cls.sleep(0.1, "after vpp startup, before initial poll")
394 cls.statistics = VPPStats(socketname=cls.tempdir+'/stats.sock')
398 cls.vpp_startup_failed = True
400 "VPP died shortly after startup, check the"
401 " output to standard error for possible cause")
407 cls.vapi.disconnect()
410 if cls.debug_gdbserver:
411 print(colorize("You're running VPP inside gdbserver but "
412 "VPP-API connection failed, did you forget "
413 "to 'continue' VPP from within gdb?", RED))
425 Disconnect vpp-api, kill vpp and cleanup shared memory files
427 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
429 if cls.vpp.returncode is None:
430 print(double_line_delim)
431 print("VPP or GDB server is still running")
432 print(single_line_delim)
433 raw_input("When done debugging, press ENTER to kill the "
434 "process and finish running the testcase...")
436 # first signal that we want to stop the pump thread, then wake it up
437 if hasattr(cls, 'pump_thread_stop_flag'):
438 cls.pump_thread_stop_flag.set()
439 if hasattr(cls, 'pump_thread_wakeup_pipe'):
440 os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
441 if hasattr(cls, 'pump_thread'):
442 cls.logger.debug("Waiting for pump thread to stop")
443 cls.pump_thread.join()
444 if hasattr(cls, 'vpp_stderr_reader_thread'):
445 cls.logger.debug("Waiting for stdderr pump to stop")
446 cls.vpp_stderr_reader_thread.join()
448 if hasattr(cls, 'vpp'):
449 if hasattr(cls, 'vapi'):
450 cls.vapi.disconnect()
453 if cls.vpp.returncode is None:
454 cls.logger.debug("Sending TERM to vpp")
456 cls.logger.debug("Waiting for vpp to die")
457 cls.vpp.communicate()
460 if cls.vpp_startup_failed:
461 stdout_log = cls.logger.info
462 stderr_log = cls.logger.critical
464 stdout_log = cls.logger.info
465 stderr_log = cls.logger.info
467 if hasattr(cls, 'vpp_stdout_deque'):
468 stdout_log(single_line_delim)
469 stdout_log('VPP output to stdout while running %s:', cls.__name__)
470 stdout_log(single_line_delim)
471 vpp_output = "".join(cls.vpp_stdout_deque)
472 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
474 stdout_log('\n%s', vpp_output)
475 stdout_log(single_line_delim)
477 if hasattr(cls, 'vpp_stderr_deque'):
478 stderr_log(single_line_delim)
479 stderr_log('VPP output to stderr while running %s:', cls.__name__)
480 stderr_log(single_line_delim)
481 vpp_output = "".join(cls.vpp_stderr_deque)
482 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
484 stderr_log('\n%s', vpp_output)
485 stderr_log(single_line_delim)
488 def tearDownClass(cls):
489 """ Perform final cleanup after running all tests in this test-case """
491 cls.file_handler.close()
492 cls.reset_packet_infos()
494 debug_internal.on_tear_down_class(cls)
497 """ Show various debug prints after each test """
498 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
499 (self.__class__.__name__, self._testMethodName,
500 self._testMethodDoc))
501 if not self.vpp_dead:
502 self.logger.debug(self.vapi.cli("show trace"))
503 self.logger.info(self.vapi.ppcli("show interface"))
504 self.logger.info(self.vapi.ppcli("show hardware"))
505 self.logger.info(self.statistics.set_errors_str())
506 self.logger.info(self.vapi.ppcli("show run"))
507 self.logger.info(self.vapi.ppcli("show log"))
508 self.registry.remove_vpp_config(self.logger)
509 # Save/Dump VPP api trace log
510 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
511 tmp_api_trace = "/tmp/%s" % api_trace
512 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
513 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
514 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
516 os.rename(tmp_api_trace, vpp_api_trace_log)
517 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
520 self.registry.unregister_all(self.logger)
523 """ Clear trace before running each test"""
524 self.reporter.send_keep_alive(self)
525 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
526 (self.__class__.__name__, self._testMethodName,
527 self._testMethodDoc))
529 raise Exception("VPP is dead when setting up the test")
530 self.sleep(.1, "during setUp")
531 self.vpp_stdout_deque.append(
532 "--- test setUp() for %s.%s(%s) starts here ---\n" %
533 (self.__class__.__name__, self._testMethodName,
534 self._testMethodDoc))
535 self.vpp_stderr_deque.append(
536 "--- test setUp() for %s.%s(%s) starts here ---\n" %
537 (self.__class__.__name__, self._testMethodName,
538 self._testMethodDoc))
539 self.vapi.cli("clear trace")
540 # store the test instance inside the test class - so that objects
541 # holding the class can access instance methods (like assertEqual)
542 type(self).test_instance = self
545 def pg_enable_capture(cls, interfaces=None):
547 Enable capture on packet-generator interfaces
549 :param interfaces: iterable interface indexes (if None,
550 use self.pg_interfaces)
553 if interfaces is None:
554 interfaces = cls.pg_interfaces
559 def register_capture(cls, cap_name):
560 """ Register a capture in the testclass """
561 # add to the list of captures with current timestamp
562 cls._captures.append((time.time(), cap_name))
563 # filter out from zombies
564 cls._zombie_captures = [(stamp, name)
565 for (stamp, name) in cls._zombie_captures
570 """ Remove any zombie captures and enable the packet generator """
571 # how long before capture is allowed to be deleted - otherwise vpp
572 # crashes - 100ms seems enough (this shouldn't be needed at all)
575 for stamp, cap_name in cls._zombie_captures:
576 wait = stamp + capture_ttl - now
578 cls.sleep(wait, "before deleting capture %s" % cap_name)
580 cls.logger.debug("Removing zombie capture %s" % cap_name)
581 cls.vapi.cli('packet-generator delete %s' % cap_name)
583 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
584 cls.vapi.cli('packet-generator enable')
585 cls._zombie_captures = cls._captures
589 def create_pg_interfaces(cls, interfaces):
591 Create packet-generator interfaces.
593 :param interfaces: iterable indexes of the interfaces.
594 :returns: List of created interfaces.
599 intf = VppPGInterface(cls, i)
600 setattr(cls, intf.name, intf)
602 cls.pg_interfaces = result
606 def create_loopback_interfaces(cls, count):
608 Create loopback interfaces.
610 :param count: number of interfaces created.
611 :returns: List of created interfaces.
613 result = [VppLoInterface(cls) for i in range(count)]
615 setattr(cls, intf.name, intf)
616 cls.lo_interfaces = result
620 def extend_packet(packet, size, padding=' '):
622 Extend packet to given size by padding with spaces or custom padding
623 NOTE: Currently works only when Raw layer is present.
625 :param packet: packet
626 :param size: target size
627 :param padding: padding used to extend the payload
630 packet_len = len(packet) + 4
631 extend = size - packet_len
633 num = (extend / len(padding)) + 1
634 packet[Raw].load += (padding * num)[:extend]
637 def reset_packet_infos(cls):
638 """ Reset the list of packet info objects and packet counts to zero """
639 cls._packet_infos = {}
640 cls._packet_count_for_dst_if_idx = {}
643 def create_packet_info(cls, src_if, dst_if):
645 Create packet info object containing the source and destination indexes
646 and add it to the testcase's packet info list
648 :param VppInterface src_if: source interface
649 :param VppInterface dst_if: destination interface
651 :returns: _PacketInfo object
655 info.index = len(cls._packet_infos)
656 info.src = src_if.sw_if_index
657 info.dst = dst_if.sw_if_index
658 if isinstance(dst_if, VppSubInterface):
659 dst_idx = dst_if.parent.sw_if_index
661 dst_idx = dst_if.sw_if_index
662 if dst_idx in cls._packet_count_for_dst_if_idx:
663 cls._packet_count_for_dst_if_idx[dst_idx] += 1
665 cls._packet_count_for_dst_if_idx[dst_idx] = 1
666 cls._packet_infos[info.index] = info
670 def info_to_payload(info):
672 Convert _PacketInfo object to packet payload
674 :param info: _PacketInfo object
676 :returns: string containing serialized data from packet info
678 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
682 def payload_to_info(payload):
684 Convert packet payload to _PacketInfo object
686 :param payload: packet payload
688 :returns: _PacketInfo object containing de-serialized data from payload
691 numbers = payload.split()
693 info.index = int(numbers[0])
694 info.src = int(numbers[1])
695 info.dst = int(numbers[2])
696 info.ip = int(numbers[3])
697 info.proto = int(numbers[4])
700 def get_next_packet_info(self, info):
702 Iterate over the packet info list stored in the testcase
703 Start iteration with first element if info is None
704 Continue based on index in info if info is specified
706 :param info: info or None
707 :returns: next info in list or None if no more infos
712 next_index = info.index + 1
713 if next_index == len(self._packet_infos):
716 return self._packet_infos[next_index]
718 def get_next_packet_info_for_interface(self, src_index, info):
720 Search the packet info list for the next packet info with same source
723 :param src_index: source interface index to search for
724 :param info: packet info - where to start the search
725 :returns: packet info or None
729 info = self.get_next_packet_info(info)
732 if info.src == src_index:
735 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
737 Search the packet info list for the next packet info with same source
738 and destination interface indexes
740 :param src_index: source interface index to search for
741 :param dst_index: destination interface index to search for
742 :param info: packet info - where to start the search
743 :returns: packet info or None
747 info = self.get_next_packet_info_for_interface(src_index, info)
750 if info.dst == dst_index:
753 def assert_equal(self, real_value, expected_value, name_or_class=None):
754 if name_or_class is None:
755 self.assertEqual(real_value, expected_value)
758 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
759 msg = msg % (getdoc(name_or_class).strip(),
760 real_value, str(name_or_class(real_value)),
761 expected_value, str(name_or_class(expected_value)))
763 msg = "Invalid %s: %s does not match expected value %s" % (
764 name_or_class, real_value, expected_value)
766 self.assertEqual(real_value, expected_value, msg)
768 def assert_in_range(self,
776 msg = "Invalid %s: %s out of range <%s,%s>" % (
777 name, real_value, expected_min, expected_max)
778 self.assertTrue(expected_min <= real_value <= expected_max, msg)
780 def assert_packet_checksums_valid(self, packet,
781 ignore_zero_udp_checksums=True):
782 received = packet.__class__(str(packet))
784 ppp("Verifying packet checksums for packet:", received))
785 udp_layers = ['UDP', 'UDPerror']
786 checksum_fields = ['cksum', 'chksum']
789 temp = received.__class__(str(received))
791 layer = temp.getlayer(counter)
793 for cf in checksum_fields:
794 if hasattr(layer, cf):
795 if ignore_zero_udp_checksums and \
796 0 == getattr(layer, cf) and \
797 layer.name in udp_layers:
800 checksums.append((counter, cf))
803 counter = counter + 1
804 if 0 == len(checksums):
806 temp = temp.__class__(str(temp))
807 for layer, cf in checksums:
808 calc_sum = getattr(temp[layer], cf)
810 getattr(received[layer], cf), calc_sum,
811 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
813 "Checksum field `%s` on `%s` layer has correct value `%s`" %
814 (cf, temp[layer].name, calc_sum))
816 def assert_checksum_valid(self, received_packet, layer,
818 ignore_zero_checksum=False):
819 """ Check checksum of received packet on given layer """
820 received_packet_checksum = getattr(received_packet[layer], field_name)
821 if ignore_zero_checksum and 0 == received_packet_checksum:
823 recalculated = received_packet.__class__(str(received_packet))
824 delattr(recalculated[layer], field_name)
825 recalculated = recalculated.__class__(str(recalculated))
826 self.assert_equal(received_packet_checksum,
827 getattr(recalculated[layer], field_name),
828 "packet checksum on layer: %s" % layer)
830 def assert_ip_checksum_valid(self, received_packet,
831 ignore_zero_checksum=False):
832 self.assert_checksum_valid(received_packet, 'IP',
833 ignore_zero_checksum=ignore_zero_checksum)
835 def assert_tcp_checksum_valid(self, received_packet,
836 ignore_zero_checksum=False):
837 self.assert_checksum_valid(received_packet, 'TCP',
838 ignore_zero_checksum=ignore_zero_checksum)
840 def assert_udp_checksum_valid(self, received_packet,
841 ignore_zero_checksum=True):
842 self.assert_checksum_valid(received_packet, 'UDP',
843 ignore_zero_checksum=ignore_zero_checksum)
845 def assert_embedded_icmp_checksum_valid(self, received_packet):
846 if received_packet.haslayer(IPerror):
847 self.assert_checksum_valid(received_packet, 'IPerror')
848 if received_packet.haslayer(TCPerror):
849 self.assert_checksum_valid(received_packet, 'TCPerror')
850 if received_packet.haslayer(UDPerror):
851 self.assert_checksum_valid(received_packet, 'UDPerror',
852 ignore_zero_checksum=True)
853 if received_packet.haslayer(ICMPerror):
854 self.assert_checksum_valid(received_packet, 'ICMPerror')
856 def assert_icmp_checksum_valid(self, received_packet):
857 self.assert_checksum_valid(received_packet, 'ICMP')
858 self.assert_embedded_icmp_checksum_valid(received_packet)
860 def assert_icmpv6_checksum_valid(self, pkt):
861 if pkt.haslayer(ICMPv6DestUnreach):
862 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
863 self.assert_embedded_icmp_checksum_valid(pkt)
864 if pkt.haslayer(ICMPv6EchoRequest):
865 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
866 if pkt.haslayer(ICMPv6EchoReply):
867 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
870 def sleep(cls, timeout, remark=None):
871 if hasattr(cls, 'logger'):
872 cls.logger.debug("Starting sleep for %ss (%s)" % (timeout, remark))
876 if after - before > 2 * timeout:
877 cls.logger.error("unexpected time.sleep() result - "
878 "slept for %ss instead of ~%ss!" % (
879 after - before, timeout))
880 if hasattr(cls, 'logger'):
882 "Finished sleep (%s) - slept %ss (wanted %ss)" % (
883 remark, after - before, timeout))
885 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
886 self.vapi.cli("clear trace")
887 intf.add_stream(pkts)
888 self.pg_enable_capture(self.pg_interfaces)
892 for i in self.pg_interfaces:
893 i.get_capture(0, timeout=timeout)
894 i.assert_nothing_captured(remark=remark)
897 def send_and_expect(self, input, pkts, output):
898 self.vapi.cli("clear trace")
899 input.add_stream(pkts)
900 self.pg_enable_capture(self.pg_interfaces)
902 rx = output.get_capture(len(pkts))
906 def get_testcase_doc_name(test):
907 return getdoc(test.__class__).splitlines()[0]
910 def get_test_description(descriptions, test):
911 # TODO: if none print warning not raise exception
912 short_description = test.shortDescription()
913 if descriptions and short_description:
914 return short_description
919 class TestCasePrinter(object):
923 self.__dict__ = self._shared_state
924 if not hasattr(self, "_test_case_set"):
925 self._test_case_set = set()
927 def print_test_case_heading_if_first_time(self, case):
928 if case.__class__ not in self._test_case_set:
929 print(double_line_delim)
930 print(colorize(get_testcase_doc_name(case), GREEN))
931 print(double_line_delim)
932 self._test_case_set.add(case.__class__)
935 class VppTestResult(unittest.TestResult):
937 @property result_string
938 String variable to store the test case result string.
940 List variable containing 2-tuples of TestCase instances and strings
941 holding formatted tracebacks. Each tuple represents a test which
942 raised an unexpected exception.
944 List variable containing 2-tuples of TestCase instances and strings
945 holding formatted tracebacks. Each tuple represents a test where
946 a failure was explicitly signalled using the TestCase.assert*()
950 def __init__(self, stream, descriptions, verbosity):
952 :param stream File descriptor to store where to report test results.
953 Set to the standard error stream by default.
954 :param descriptions Boolean variable to store information if to use
955 test case descriptions.
956 :param verbosity Integer variable to store required verbosity level.
958 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
960 self.descriptions = descriptions
961 self.verbosity = verbosity
962 self.result_string = None
963 self.printer = TestCasePrinter()
966 def addSuccess(self, test):
968 Record a test succeeded result
973 if hasattr(test, 'logger'):
974 test.logger.debug("--- addSuccess() %s.%s(%s) called"
975 % (test.__class__.__name__,
976 test._testMethodName,
977 test._testMethodDoc))
978 self.passed.append(test.id())
979 unittest.TestResult.addSuccess(self, test)
980 self.result_string = colorize("OK", GREEN)
982 def addSkip(self, test, reason):
984 Record a test skipped.
990 if hasattr(test, 'logger'):
991 test.logger.debug("--- addSkip() %s.%s(%s) called, reason is %s"
992 % (test.__class__.__name__,
993 test._testMethodName,
996 unittest.TestResult.addSkip(self, test, reason)
997 self.result_string = colorize("SKIP", YELLOW)
999 def symlink_failed(self, test):
1001 if hasattr(test, 'logger'):
1002 logger = test.logger
1003 if hasattr(test, 'tempdir'):
1005 failed_dir = os.getenv('VPP_TEST_FAILED_DIR')
1006 link_path = os.path.join(failed_dir, '%s-FAILED' %
1007 os.path.basename(test.tempdir))
1009 logger.debug("creating a link to the failed test")
1010 logger.debug("os.symlink(%s, %s)" %
1011 (test.tempdir, link_path))
1012 if os.path.exists(link_path):
1014 logger.debug('symlink already exists')
1016 os.symlink(test.tempdir, link_path)
1018 except Exception as e:
1022 def send_results_through_pipe(self):
1023 if hasattr(self, 'test_framework_results_pipe'):
1024 pipe = self.test_framework_results_pipe
1028 def addFailure(self, test, err):
1030 Record a test failed result
1033 :param err: error message
1036 if hasattr(test, 'logger'):
1037 test.logger.debug("--- addFailure() %s.%s(%s) called, err is %s"
1038 % (test.__class__.__name__,
1039 test._testMethodName,
1040 test._testMethodDoc, err))
1041 test.logger.debug("formatted exception is:\n%s" %
1042 "".join(format_exception(*err)))
1043 unittest.TestResult.addFailure(self, test, err)
1044 if hasattr(test, 'tempdir'):
1045 self.result_string = colorize("FAIL", RED) + \
1046 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
1047 self.symlink_failed(test)
1049 self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
1051 def addError(self, test, err):
1053 Record a test error result
1056 :param err: error message
1059 if hasattr(test, 'logger'):
1060 test.logger.debug("--- addError() %s.%s(%s) called, err is %s"
1061 % (test.__class__.__name__,
1062 test._testMethodName,
1063 test._testMethodDoc, err))
1064 test.logger.debug("formatted exception is:\n%s" %
1065 "".join(format_exception(*err)))
1066 unittest.TestResult.addError(self, test, err)
1067 if hasattr(test, 'tempdir'):
1068 self.result_string = colorize("ERROR", RED) + \
1069 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
1070 self.symlink_failed(test)
1072 self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
1074 def getDescription(self, test):
1076 Get test description
1079 :returns: test description
1082 return get_test_description(self.descriptions, test)
1084 def startTest(self, test):
1091 self.printer.print_test_case_heading_if_first_time(test)
1092 unittest.TestResult.startTest(self, test)
1093 if self.verbosity > 0:
1094 self.stream.writeln(
1095 "Starting " + self.getDescription(test) + " ...")
1096 self.stream.writeln(single_line_delim)
1098 def stopTest(self, test):
1100 Called when the given test has been run
1105 unittest.TestResult.stopTest(self, test)
1106 if self.verbosity > 0:
1107 self.stream.writeln(single_line_delim)
1108 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1109 self.result_string))
1110 self.stream.writeln(single_line_delim)
1112 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1113 self.result_string))
1114 self.send_results_through_pipe()
1116 def printErrors(self):
1118 Print errors from running the test case
1120 self.stream.writeln()
1121 self.printErrorList('ERROR', self.errors)
1122 self.printErrorList('FAIL', self.failures)
1124 def printErrorList(self, flavour, errors):
1126 Print error list to the output stream together with error type
1127 and test case description.
1129 :param flavour: error type
1130 :param errors: iterable errors
1133 for test, err in errors:
1134 self.stream.writeln(double_line_delim)
1135 self.stream.writeln("%s: %s" %
1136 (flavour, self.getDescription(test)))
1137 self.stream.writeln(single_line_delim)
1138 self.stream.writeln("%s" % err)
1141 class VppTestRunner(unittest.TextTestRunner):
1143 A basic test runner implementation which prints results to standard error.
1146 def resultclass(self):
1147 """Class maintaining the results of the tests"""
1148 return VppTestResult
1150 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1151 results_pipe=None, failfast=False, buffer=False,
1153 # ignore stream setting here, use hard-coded stdout to be in sync
1154 # with prints from VppTestCase methods ...
1155 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1156 verbosity, failfast, buffer,
1158 reporter = KeepAliveReporter()
1159 reporter.pipe = keep_alive_pipe
1161 VppTestResult.test_framework_results_pipe = results_pipe
1163 def run(self, test):
1170 faulthandler.enable() # emit stack trace to stderr if killed by signal
1172 result = super(VppTestRunner, self).run(test)
1176 class Worker(Thread):
1177 def __init__(self, args, logger, env={}):
1178 self.logger = logger
1181 self.env = copy.deepcopy(env)
1182 super(Worker, self).__init__()
1185 executable = self.args[0]
1186 self.logger.debug("Running executable w/args `%s'" % self.args)
1187 env = os.environ.copy()
1188 env.update(self.env)
1189 env["CK_LOG_FILE_NAME"] = "-"
1190 self.process = subprocess.Popen(
1191 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1192 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1193 out, err = self.process.communicate()
1194 self.logger.debug("Finished running `%s'" % executable)
1195 self.logger.info("Return code is `%s'" % self.process.returncode)
1196 self.logger.info(single_line_delim)
1197 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1198 self.logger.info(single_line_delim)
1199 self.logger.info(out)
1200 self.logger.info(single_line_delim)
1201 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1202 self.logger.info(single_line_delim)
1203 self.logger.info(err)
1204 self.logger.info(single_line_delim)
1205 self.result = self.process.returncode