3 from __future__ import print_function
15 from collections import deque
16 from threading import Thread, Event
17 from inspect import getdoc, isclass
18 from traceback import format_exception
19 from logging import FileHandler, DEBUG, Formatter
20 from scapy.packet import Raw
21 from hook import StepHook, PollHook, VppDiedError
22 from vpp_pg_interface import VppPGInterface
23 from vpp_sub_interface import VppSubInterface
24 from vpp_lo_interface import VppLoInterface
25 from vpp_papi_provider import VppPapiProvider
26 from vpp_papi.vpp_stats import VPPStats
27 from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
29 from vpp_object import VppObjectRegistry
31 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
32 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
33 from scapy.layers.inet6 import ICMPv6EchoReply
34 if os.name == 'posix' and sys.version_info[0] < 3:
35 # using subprocess32 is recommended by python official documentation
36 # @ https://docs.python.org/2/library/subprocess.html
37 import subprocess32 as subprocess
49 debug_framework = False
50 if os.getenv('TEST_DEBUG', "0") == "1":
51 debug_framework = True
56 Test framework module.
58 The module provides a set of tools for constructing and running tests and
59 representing the results.
63 class _PacketInfo(object):
64 """Private class to create packet info object.
66 Help process information about the next packet.
67 Set variables to default values.
69 #: Store the index of the packet.
71 #: Store the index of the source packet generator interface of the packet.
73 #: Store the index of the destination packet generator interface
76 #: Store expected ip version
78 #: Store expected upper protocol
80 #: Store the copy of the former packet.
83 def __eq__(self, other):
84 index = self.index == other.index
85 src = self.src == other.src
86 dst = self.dst == other.dst
87 data = self.data == other.data
88 return index and src and dst and data
91 def pump_output(testclass):
92 """ pump output from vpp stdout/stderr to proper queues """
95 while not testclass.pump_thread_stop_flag.is_set():
96 readable = select.select([testclass.vpp.stdout.fileno(),
97 testclass.vpp.stderr.fileno(),
98 testclass.pump_thread_wakeup_pipe[0]],
100 if testclass.vpp.stdout.fileno() in readable:
101 read = os.read(testclass.vpp.stdout.fileno(), 102400)
103 split = read.splitlines(True)
104 if len(stdout_fragment) > 0:
105 split[0] = "%s%s" % (stdout_fragment, split[0])
106 if len(split) > 0 and split[-1].endswith("\n"):
110 stdout_fragment = split[-1]
111 testclass.vpp_stdout_deque.extend(split[:limit])
112 if not testclass.cache_vpp_output:
113 for line in split[:limit]:
114 testclass.logger.debug(
115 "VPP STDOUT: %s" % line.rstrip("\n"))
116 if testclass.vpp.stderr.fileno() in readable:
117 read = os.read(testclass.vpp.stderr.fileno(), 102400)
119 split = read.splitlines(True)
120 if len(stderr_fragment) > 0:
121 split[0] = "%s%s" % (stderr_fragment, split[0])
122 if len(split) > 0 and split[-1].endswith("\n"):
126 stderr_fragment = split[-1]
127 testclass.vpp_stderr_deque.extend(split[:limit])
128 if not testclass.cache_vpp_output:
129 for line in split[:limit]:
130 testclass.logger.debug(
131 "VPP STDERR: %s" % line.rstrip("\n"))
132 # ignoring the dummy pipe here intentionally - the flag will take care
133 # of properly terminating the loop
136 def running_extended_tests():
137 s = os.getenv("EXTENDED_TESTS", "n")
138 return True if s.lower() in ("y", "yes", "1") else False
141 def running_on_centos():
142 os_id = os.getenv("OS_ID", "")
143 return True if "centos" in os_id.lower() else False
146 class KeepAliveReporter(object):
148 Singleton object which reports test start to parent process
153 self.__dict__ = self._shared_state
160 def pipe(self, pipe):
161 if hasattr(self, '_pipe'):
162 raise Exception("Internal error - pipe should only be set once.")
165 def send_keep_alive(self, test):
167 Write current test tmpdir & desc to keep-alive pipe to signal liveness
169 if self.pipe is None:
170 # if not running forked..
176 desc = test.shortDescription()
180 self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
183 class VppTestCase(unittest.TestCase):
184 """This subclass is a base class for VPP test cases that are implemented as
185 classes. It provides methods to create and run test case.
189 def packet_infos(self):
190 """List of packet infos"""
191 return self._packet_infos
194 def get_packet_count_for_if_idx(cls, dst_if_index):
195 """Get the number of packet info for specified destination if index"""
196 if dst_if_index in cls._packet_count_for_dst_if_idx:
197 return cls._packet_count_for_dst_if_idx[dst_if_index]
203 """Return the instance of this testcase"""
204 return cls.test_instance
207 def set_debug_flags(cls, d):
208 cls.debug_core = False
209 cls.debug_gdb = False
210 cls.debug_gdbserver = False
215 cls.debug_core = True
218 elif dl == "gdbserver":
219 cls.debug_gdbserver = True
221 raise Exception("Unrecognized DEBUG option: '%s'" % d)
224 def get_least_used_cpu(self):
225 cpu_usage_list = [set(range(psutil.cpu_count()))]
226 vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
227 if 'vpp_main' == p.info['name']]
228 for vpp_process in vpp_processes:
229 for cpu_usage_set in cpu_usage_list:
231 cpu_num = vpp_process.cpu_num()
232 if cpu_num in cpu_usage_set:
233 cpu_usage_set_index = cpu_usage_list.index(
235 if cpu_usage_set_index == len(cpu_usage_list) - 1:
236 cpu_usage_list.append({cpu_num})
238 cpu_usage_list[cpu_usage_set_index + 1].add(
240 cpu_usage_set.remove(cpu_num)
242 except psutil.NoSuchProcess:
245 for cpu_usage_set in cpu_usage_list:
246 if len(cpu_usage_set) > 0:
247 min_usage_set = cpu_usage_set
250 return random.choice(tuple(min_usage_set))
253 def setUpConstants(cls):
254 """ Set-up the test case class based on environment variables """
255 s = os.getenv("STEP", "n")
256 cls.step = True if s.lower() in ("y", "yes", "1") else False
257 d = os.getenv("DEBUG", None)
258 c = os.getenv("CACHE_OUTPUT", "1")
259 cls.cache_vpp_output = False if c.lower() in ("n", "no", "0") else True
260 cls.set_debug_flags(d)
261 cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
262 cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
263 cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
265 if cls.plugin_path is not None:
266 if cls.extern_plugin_path is not None:
267 plugin_path = "%s:%s" % (
268 cls.plugin_path, cls.extern_plugin_path)
270 plugin_path = cls.plugin_path
271 elif cls.extern_plugin_path is not None:
272 plugin_path = cls.extern_plugin_path
274 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
275 debug_cli = "cli-listen localhost:5002"
277 size = os.getenv("COREDUMP_SIZE")
279 coredump_size = "coredump-size %s" % size
280 if coredump_size is None:
281 coredump_size = "coredump-size unlimited"
283 cpu_core_number = cls.get_least_used_cpu()
285 cls.vpp_cmdline = [cls.vpp_bin, "unix",
286 "{", "nodaemon", debug_cli, "full-coredump",
287 coredump_size, "}", "api-trace", "{", "on", "}",
288 "api-segment", "{", "prefix", cls.shm_prefix, "}",
289 "cpu", "{", "main-core", str(cpu_core_number), "}",
290 "statseg", "{", "socket-name",
291 cls.tempdir + "/stats.sock", "}",
292 "plugins", "{", "plugin", "dpdk_plugin.so", "{",
293 "disable", "}", "plugin", "unittest_plugin.so",
294 "{", "enable", "}", "}", ]
295 if plugin_path is not None:
296 cls.vpp_cmdline.extend(["plugin_path", plugin_path])
297 cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
300 def wait_for_enter(cls):
301 if cls.debug_gdbserver:
302 print(double_line_delim)
303 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
305 print(double_line_delim)
306 print("Spawned VPP with PID: %d" % cls.vpp.pid)
308 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
310 print(single_line_delim)
311 print("You can debug the VPP using e.g.:")
312 if cls.debug_gdbserver:
313 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
314 print("Now is the time to attach a gdb by running the above "
315 "command, set up breakpoints etc. and then resume VPP from "
316 "within gdb by issuing the 'continue' command")
318 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
319 print("Now is the time to attach a gdb by running the above "
320 "command and set up breakpoints etc.")
321 print(single_line_delim)
322 raw_input("Press ENTER to continue running the testcase...")
326 cmdline = cls.vpp_cmdline
328 if cls.debug_gdbserver:
329 gdbserver = '/usr/bin/gdbserver'
330 if not os.path.isfile(gdbserver) or \
331 not os.access(gdbserver, os.X_OK):
332 raise Exception("gdbserver binary '%s' does not exist or is "
333 "not executable" % gdbserver)
335 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
336 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
339 cls.vpp = subprocess.Popen(cmdline,
340 stdout=subprocess.PIPE,
341 stderr=subprocess.PIPE,
343 except Exception as e:
344 cls.logger.critical("Couldn't start vpp: %s" % e)
352 Perform class setup before running the testcase
353 Remove shared memory files, start vpp and connect the vpp-api
355 gc.collect() # run garbage collection first
357 if not hasattr(cls, 'logger'):
358 cls.logger = getLogger(cls.__name__)
360 cls.logger.name = cls.__name__
361 cls.tempdir = tempfile.mkdtemp(
362 prefix='vpp-unittest-%s-' % cls.__name__)
363 cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
364 cls.file_handler.setFormatter(
365 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
367 cls.file_handler.setLevel(DEBUG)
368 cls.logger.addHandler(cls.file_handler)
369 cls.shm_prefix = os.path.basename(cls.tempdir)
370 os.chdir(cls.tempdir)
371 cls.logger.info("Temporary dir is %s, shm prefix is %s",
372 cls.tempdir, cls.shm_prefix)
374 cls.reset_packet_infos()
376 cls._zombie_captures = []
379 cls.registry = VppObjectRegistry()
380 cls.vpp_startup_failed = False
381 cls.reporter = KeepAliveReporter()
382 # need to catch exceptions here because if we raise, then the cleanup
383 # doesn't get called and we might end with a zombie vpp
386 cls.reporter.send_keep_alive(cls)
387 cls.vpp_stdout_deque = deque()
388 cls.vpp_stderr_deque = deque()
389 cls.pump_thread_stop_flag = Event()
390 cls.pump_thread_wakeup_pipe = os.pipe()
391 cls.pump_thread = Thread(target=pump_output, args=(cls,))
392 cls.pump_thread.daemon = True
393 cls.pump_thread.start()
394 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls)
399 cls.vapi.register_hook(hook)
400 cls.sleep(0.1, "after vpp startup, before initial poll")
401 cls.statistics = VPPStats(socketname=cls.tempdir+'/stats.sock')
405 cls.vpp_startup_failed = True
407 "VPP died shortly after startup, check the"
408 " output to standard error for possible cause")
414 cls.vapi.disconnect()
417 if cls.debug_gdbserver:
418 print(colorize("You're running VPP inside gdbserver but "
419 "VPP-API connection failed, did you forget "
420 "to 'continue' VPP from within gdb?", RED))
432 Disconnect vpp-api, kill vpp and cleanup shared memory files
434 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
436 if cls.vpp.returncode is None:
437 print(double_line_delim)
438 print("VPP or GDB server is still running")
439 print(single_line_delim)
440 raw_input("When done debugging, press ENTER to kill the "
441 "process and finish running the testcase...")
443 # first signal that we want to stop the pump thread, then wake it up
444 if hasattr(cls, 'pump_thread_stop_flag'):
445 cls.pump_thread_stop_flag.set()
446 if hasattr(cls, 'pump_thread_wakeup_pipe'):
447 os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
448 if hasattr(cls, 'pump_thread'):
449 cls.logger.debug("Waiting for pump thread to stop")
450 cls.pump_thread.join()
451 if hasattr(cls, 'vpp_stderr_reader_thread'):
452 cls.logger.debug("Waiting for stdderr pump to stop")
453 cls.vpp_stderr_reader_thread.join()
455 if hasattr(cls, 'vpp'):
456 if hasattr(cls, 'vapi'):
457 cls.vapi.disconnect()
460 if cls.vpp.returncode is None:
461 cls.logger.debug("Sending TERM to vpp")
463 cls.logger.debug("Waiting for vpp to die")
464 cls.vpp.communicate()
467 if cls.vpp_startup_failed:
468 stdout_log = cls.logger.info
469 stderr_log = cls.logger.critical
471 stdout_log = cls.logger.info
472 stderr_log = cls.logger.info
474 if hasattr(cls, 'vpp_stdout_deque'):
475 stdout_log(single_line_delim)
476 stdout_log('VPP output to stdout while running %s:', cls.__name__)
477 stdout_log(single_line_delim)
478 vpp_output = "".join(cls.vpp_stdout_deque)
479 with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
481 stdout_log('\n%s', vpp_output)
482 stdout_log(single_line_delim)
484 if hasattr(cls, 'vpp_stderr_deque'):
485 stderr_log(single_line_delim)
486 stderr_log('VPP output to stderr while running %s:', cls.__name__)
487 stderr_log(single_line_delim)
488 vpp_output = "".join(cls.vpp_stderr_deque)
489 with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
491 stderr_log('\n%s', vpp_output)
492 stderr_log(single_line_delim)
495 def tearDownClass(cls):
496 """ Perform final cleanup after running all tests in this test-case """
498 cls.file_handler.close()
499 cls.reset_packet_infos()
501 debug_internal.on_tear_down_class(cls)
504 """ Show various debug prints after each test """
505 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
506 (self.__class__.__name__, self._testMethodName,
507 self._testMethodDoc))
508 if not self.vpp_dead:
509 self.logger.debug(self.vapi.cli("show trace"))
510 self.logger.info(self.vapi.ppcli("show interface"))
511 self.logger.info(self.vapi.ppcli("show hardware"))
512 self.logger.info(self.statistics.set_errors_str())
513 self.logger.info(self.vapi.ppcli("show run"))
514 self.logger.info(self.vapi.ppcli("show log"))
515 self.registry.remove_vpp_config(self.logger)
516 # Save/Dump VPP api trace log
517 api_trace = "vpp_api_trace.%s.log" % self._testMethodName
518 tmp_api_trace = "/tmp/%s" % api_trace
519 vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
520 self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
521 self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
523 os.rename(tmp_api_trace, vpp_api_trace_log)
524 self.logger.info(self.vapi.ppcli("api trace custom-dump %s" %
527 self.registry.unregister_all(self.logger)
530 """ Clear trace before running each test"""
531 self.reporter.send_keep_alive(self)
532 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
533 (self.__class__.__name__, self._testMethodName,
534 self._testMethodDoc))
536 raise Exception("VPP is dead when setting up the test")
537 self.sleep(.1, "during setUp")
538 self.vpp_stdout_deque.append(
539 "--- test setUp() for %s.%s(%s) starts here ---\n" %
540 (self.__class__.__name__, self._testMethodName,
541 self._testMethodDoc))
542 self.vpp_stderr_deque.append(
543 "--- test setUp() for %s.%s(%s) starts here ---\n" %
544 (self.__class__.__name__, self._testMethodName,
545 self._testMethodDoc))
546 self.vapi.cli("clear trace")
547 # store the test instance inside the test class - so that objects
548 # holding the class can access instance methods (like assertEqual)
549 type(self).test_instance = self
552 def pg_enable_capture(cls, interfaces=None):
554 Enable capture on packet-generator interfaces
556 :param interfaces: iterable interface indexes (if None,
557 use self.pg_interfaces)
560 if interfaces is None:
561 interfaces = cls.pg_interfaces
566 def register_capture(cls, cap_name):
567 """ Register a capture in the testclass """
568 # add to the list of captures with current timestamp
569 cls._captures.append((time.time(), cap_name))
570 # filter out from zombies
571 cls._zombie_captures = [(stamp, name)
572 for (stamp, name) in cls._zombie_captures
577 """ Remove any zombie captures and enable the packet generator """
578 # how long before capture is allowed to be deleted - otherwise vpp
579 # crashes - 100ms seems enough (this shouldn't be needed at all)
582 for stamp, cap_name in cls._zombie_captures:
583 wait = stamp + capture_ttl - now
585 cls.sleep(wait, "before deleting capture %s" % cap_name)
587 cls.logger.debug("Removing zombie capture %s" % cap_name)
588 cls.vapi.cli('packet-generator delete %s' % cap_name)
590 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
591 cls.vapi.cli('packet-generator enable')
592 cls._zombie_captures = cls._captures
596 def create_pg_interfaces(cls, interfaces):
598 Create packet-generator interfaces.
600 :param interfaces: iterable indexes of the interfaces.
601 :returns: List of created interfaces.
606 intf = VppPGInterface(cls, i)
607 setattr(cls, intf.name, intf)
609 cls.pg_interfaces = result
613 def create_loopback_interfaces(cls, count):
615 Create loopback interfaces.
617 :param count: number of interfaces created.
618 :returns: List of created interfaces.
620 result = [VppLoInterface(cls) for i in range(count)]
622 setattr(cls, intf.name, intf)
623 cls.lo_interfaces = result
627 def extend_packet(packet, size, padding=' '):
629 Extend packet to given size by padding with spaces or custom padding
630 NOTE: Currently works only when Raw layer is present.
632 :param packet: packet
633 :param size: target size
634 :param padding: padding used to extend the payload
637 packet_len = len(packet) + 4
638 extend = size - packet_len
640 num = (extend / len(padding)) + 1
641 packet[Raw].load += (padding * num)[:extend]
644 def reset_packet_infos(cls):
645 """ Reset the list of packet info objects and packet counts to zero """
646 cls._packet_infos = {}
647 cls._packet_count_for_dst_if_idx = {}
650 def create_packet_info(cls, src_if, dst_if):
652 Create packet info object containing the source and destination indexes
653 and add it to the testcase's packet info list
655 :param VppInterface src_if: source interface
656 :param VppInterface dst_if: destination interface
658 :returns: _PacketInfo object
662 info.index = len(cls._packet_infos)
663 info.src = src_if.sw_if_index
664 info.dst = dst_if.sw_if_index
665 if isinstance(dst_if, VppSubInterface):
666 dst_idx = dst_if.parent.sw_if_index
668 dst_idx = dst_if.sw_if_index
669 if dst_idx in cls._packet_count_for_dst_if_idx:
670 cls._packet_count_for_dst_if_idx[dst_idx] += 1
672 cls._packet_count_for_dst_if_idx[dst_idx] = 1
673 cls._packet_infos[info.index] = info
677 def info_to_payload(info):
679 Convert _PacketInfo object to packet payload
681 :param info: _PacketInfo object
683 :returns: string containing serialized data from packet info
685 return "%d %d %d %d %d" % (info.index, info.src, info.dst,
689 def payload_to_info(payload):
691 Convert packet payload to _PacketInfo object
693 :param payload: packet payload
695 :returns: _PacketInfo object containing de-serialized data from payload
698 numbers = payload.split()
700 info.index = int(numbers[0])
701 info.src = int(numbers[1])
702 info.dst = int(numbers[2])
703 info.ip = int(numbers[3])
704 info.proto = int(numbers[4])
707 def get_next_packet_info(self, info):
709 Iterate over the packet info list stored in the testcase
710 Start iteration with first element if info is None
711 Continue based on index in info if info is specified
713 :param info: info or None
714 :returns: next info in list or None if no more infos
719 next_index = info.index + 1
720 if next_index == len(self._packet_infos):
723 return self._packet_infos[next_index]
725 def get_next_packet_info_for_interface(self, src_index, info):
727 Search the packet info list for the next packet info with same source
730 :param src_index: source interface index to search for
731 :param info: packet info - where to start the search
732 :returns: packet info or None
736 info = self.get_next_packet_info(info)
739 if info.src == src_index:
742 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
744 Search the packet info list for the next packet info with same source
745 and destination interface indexes
747 :param src_index: source interface index to search for
748 :param dst_index: destination interface index to search for
749 :param info: packet info - where to start the search
750 :returns: packet info or None
754 info = self.get_next_packet_info_for_interface(src_index, info)
757 if info.dst == dst_index:
760 def assert_equal(self, real_value, expected_value, name_or_class=None):
761 if name_or_class is None:
762 self.assertEqual(real_value, expected_value)
765 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
766 msg = msg % (getdoc(name_or_class).strip(),
767 real_value, str(name_or_class(real_value)),
768 expected_value, str(name_or_class(expected_value)))
770 msg = "Invalid %s: %s does not match expected value %s" % (
771 name_or_class, real_value, expected_value)
773 self.assertEqual(real_value, expected_value, msg)
775 def assert_in_range(self,
783 msg = "Invalid %s: %s out of range <%s,%s>" % (
784 name, real_value, expected_min, expected_max)
785 self.assertTrue(expected_min <= real_value <= expected_max, msg)
787 def assert_packet_checksums_valid(self, packet,
788 ignore_zero_udp_checksums=True):
789 received = packet.__class__(str(packet))
791 ppp("Verifying packet checksums for packet:", received))
792 udp_layers = ['UDP', 'UDPerror']
793 checksum_fields = ['cksum', 'chksum']
796 temp = received.__class__(str(received))
798 layer = temp.getlayer(counter)
800 for cf in checksum_fields:
801 if hasattr(layer, cf):
802 if ignore_zero_udp_checksums and \
803 0 == getattr(layer, cf) and \
804 layer.name in udp_layers:
807 checksums.append((counter, cf))
810 counter = counter + 1
811 if 0 == len(checksums):
813 temp = temp.__class__(str(temp))
814 for layer, cf in checksums:
815 calc_sum = getattr(temp[layer], cf)
817 getattr(received[layer], cf), calc_sum,
818 "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
820 "Checksum field `%s` on `%s` layer has correct value `%s`" %
821 (cf, temp[layer].name, calc_sum))
823 def assert_checksum_valid(self, received_packet, layer,
825 ignore_zero_checksum=False):
826 """ Check checksum of received packet on given layer """
827 received_packet_checksum = getattr(received_packet[layer], field_name)
828 if ignore_zero_checksum and 0 == received_packet_checksum:
830 recalculated = received_packet.__class__(str(received_packet))
831 delattr(recalculated[layer], field_name)
832 recalculated = recalculated.__class__(str(recalculated))
833 self.assert_equal(received_packet_checksum,
834 getattr(recalculated[layer], field_name),
835 "packet checksum on layer: %s" % layer)
837 def assert_ip_checksum_valid(self, received_packet,
838 ignore_zero_checksum=False):
839 self.assert_checksum_valid(received_packet, 'IP',
840 ignore_zero_checksum=ignore_zero_checksum)
842 def assert_tcp_checksum_valid(self, received_packet,
843 ignore_zero_checksum=False):
844 self.assert_checksum_valid(received_packet, 'TCP',
845 ignore_zero_checksum=ignore_zero_checksum)
847 def assert_udp_checksum_valid(self, received_packet,
848 ignore_zero_checksum=True):
849 self.assert_checksum_valid(received_packet, 'UDP',
850 ignore_zero_checksum=ignore_zero_checksum)
852 def assert_embedded_icmp_checksum_valid(self, received_packet):
853 if received_packet.haslayer(IPerror):
854 self.assert_checksum_valid(received_packet, 'IPerror')
855 if received_packet.haslayer(TCPerror):
856 self.assert_checksum_valid(received_packet, 'TCPerror')
857 if received_packet.haslayer(UDPerror):
858 self.assert_checksum_valid(received_packet, 'UDPerror',
859 ignore_zero_checksum=True)
860 if received_packet.haslayer(ICMPerror):
861 self.assert_checksum_valid(received_packet, 'ICMPerror')
863 def assert_icmp_checksum_valid(self, received_packet):
864 self.assert_checksum_valid(received_packet, 'ICMP')
865 self.assert_embedded_icmp_checksum_valid(received_packet)
867 def assert_icmpv6_checksum_valid(self, pkt):
868 if pkt.haslayer(ICMPv6DestUnreach):
869 self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
870 self.assert_embedded_icmp_checksum_valid(pkt)
871 if pkt.haslayer(ICMPv6EchoRequest):
872 self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
873 if pkt.haslayer(ICMPv6EchoReply):
874 self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
877 def sleep(cls, timeout, remark=None):
878 if hasattr(cls, 'logger'):
879 cls.logger.debug("Starting sleep for %ss (%s)" % (timeout, remark))
883 if after - before > 2 * timeout:
884 cls.logger.error("unexpected time.sleep() result - "
885 "slept for %ss instead of ~%ss!" % (
886 after - before, timeout))
887 if hasattr(cls, 'logger'):
889 "Finished sleep (%s) - slept %ss (wanted %ss)" % (
890 remark, after - before, timeout))
892 def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
893 self.vapi.cli("clear trace")
894 intf.add_stream(pkts)
895 self.pg_enable_capture(self.pg_interfaces)
899 for i in self.pg_interfaces:
900 i.get_capture(0, timeout=timeout)
901 i.assert_nothing_captured(remark=remark)
904 def send_and_expect(self, input, pkts, output):
905 self.vapi.cli("clear trace")
906 input.add_stream(pkts)
907 self.pg_enable_capture(self.pg_interfaces)
909 rx = output.get_capture(len(pkts))
913 def get_testcase_doc_name(test):
914 return getdoc(test.__class__).splitlines()[0]
917 def get_test_description(descriptions, test):
918 # TODO: if none print warning not raise exception
919 short_description = test.shortDescription()
920 if descriptions and short_description:
921 return short_description
926 class TestCasePrinter(object):
930 self.__dict__ = self._shared_state
931 if not hasattr(self, "_test_case_set"):
932 self._test_case_set = set()
934 def print_test_case_heading_if_first_time(self, case):
935 if case.__class__ not in self._test_case_set:
936 print(double_line_delim)
937 print(colorize(get_testcase_doc_name(case), GREEN))
938 print(double_line_delim)
939 self._test_case_set.add(case.__class__)
942 class VppTestResult(unittest.TestResult):
944 @property result_string
945 String variable to store the test case result string.
947 List variable containing 2-tuples of TestCase instances and strings
948 holding formatted tracebacks. Each tuple represents a test which
949 raised an unexpected exception.
951 List variable containing 2-tuples of TestCase instances and strings
952 holding formatted tracebacks. Each tuple represents a test where
953 a failure was explicitly signalled using the TestCase.assert*()
957 def __init__(self, stream, descriptions, verbosity):
959 :param stream File descriptor to store where to report test results.
960 Set to the standard error stream by default.
961 :param descriptions Boolean variable to store information if to use
962 test case descriptions.
963 :param verbosity Integer variable to store required verbosity level.
965 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
967 self.descriptions = descriptions
968 self.verbosity = verbosity
969 self.result_string = None
970 self.printer = TestCasePrinter()
972 def addSuccess(self, test):
974 Record a test succeeded result
979 if hasattr(test, 'logger'):
980 test.logger.debug("--- addSuccess() %s.%s(%s) called"
981 % (test.__class__.__name__,
982 test._testMethodName,
983 test._testMethodDoc))
984 unittest.TestResult.addSuccess(self, test)
985 self.result_string = colorize("OK", GREEN)
987 self.send_result_through_pipe(test, PASS)
989 def addSkip(self, test, reason):
991 Record a test skipped.
997 if hasattr(test, 'logger'):
998 test.logger.debug("--- addSkip() %s.%s(%s) called, reason is %s"
999 % (test.__class__.__name__,
1000 test._testMethodName,
1001 test._testMethodDoc,
1003 unittest.TestResult.addSkip(self, test, reason)
1004 self.result_string = colorize("SKIP", YELLOW)
1006 self.send_result_through_pipe(test, SKIP)
1008 def symlink_failed(self, test):
1010 if hasattr(test, 'logger'):
1011 logger = test.logger
1012 if hasattr(test, 'tempdir'):
1014 failed_dir = os.getenv('VPP_TEST_FAILED_DIR')
1015 link_path = os.path.join(failed_dir, '%s-FAILED' %
1016 os.path.basename(test.tempdir))
1018 logger.debug("creating a link to the failed test")
1019 logger.debug("os.symlink(%s, %s)" %
1020 (test.tempdir, link_path))
1021 if os.path.exists(link_path):
1023 logger.debug('symlink already exists')
1025 os.symlink(test.tempdir, link_path)
1027 except Exception as e:
1031 def send_result_through_pipe(self, test, result):
1032 if hasattr(self, 'test_framework_result_pipe'):
1033 pipe = self.test_framework_result_pipe
1035 pipe.send((test.id(), result))
1037 def addFailure(self, test, err):
1039 Record a test failed result
1042 :param err: error message
1045 if hasattr(test, 'logger'):
1046 test.logger.debug("--- addFailure() %s.%s(%s) called, err is %s"
1047 % (test.__class__.__name__,
1048 test._testMethodName,
1049 test._testMethodDoc, err))
1050 test.logger.debug("formatted exception is:\n%s" %
1051 "".join(format_exception(*err)))
1052 unittest.TestResult.addFailure(self, test, err)
1053 if hasattr(test, 'tempdir'):
1054 self.result_string = colorize("FAIL", RED) + \
1055 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
1056 self.symlink_failed(test)
1058 self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
1060 self.send_result_through_pipe(test, FAIL)
1062 def addError(self, test, err):
1064 Record a test error result
1067 :param err: error message
1070 if hasattr(test, 'logger'):
1071 test.logger.debug("--- addError() %s.%s(%s) called, err is %s"
1072 % (test.__class__.__name__,
1073 test._testMethodName,
1074 test._testMethodDoc, err))
1075 test.logger.debug("formatted exception is:\n%s" %
1076 "".join(format_exception(*err)))
1077 unittest.TestResult.addError(self, test, err)
1078 if hasattr(test, 'tempdir'):
1079 self.result_string = colorize("ERROR", RED) + \
1080 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
1081 self.symlink_failed(test)
1083 self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
1085 self.send_result_through_pipe(test, ERROR)
1087 def getDescription(self, test):
1089 Get test description
1092 :returns: test description
1095 return get_test_description(self.descriptions, test)
1097 def startTest(self, test):
1104 self.printer.print_test_case_heading_if_first_time(test)
1105 unittest.TestResult.startTest(self, test)
1106 if self.verbosity > 0:
1107 self.stream.writeln(
1108 "Starting " + self.getDescription(test) + " ...")
1109 self.stream.writeln(single_line_delim)
1111 def stopTest(self, test):
1113 Called when the given test has been run
1118 unittest.TestResult.stopTest(self, test)
1119 if self.verbosity > 0:
1120 self.stream.writeln(single_line_delim)
1121 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1122 self.result_string))
1123 self.stream.writeln(single_line_delim)
1125 self.stream.writeln("%-73s%s" % (self.getDescription(test),
1126 self.result_string))
1128 self.send_result_through_pipe(test, TEST_RUN)
1130 def printErrors(self):
1132 Print errors from running the test case
1134 self.stream.writeln()
1135 self.printErrorList('ERROR', self.errors)
1136 self.printErrorList('FAIL', self.failures)
1138 def printErrorList(self, flavour, errors):
1140 Print error list to the output stream together with error type
1141 and test case description.
1143 :param flavour: error type
1144 :param errors: iterable errors
1147 for test, err in errors:
1148 self.stream.writeln(double_line_delim)
1149 self.stream.writeln("%s: %s" %
1150 (flavour, self.getDescription(test)))
1151 self.stream.writeln(single_line_delim)
1152 self.stream.writeln("%s" % err)
1155 class VppTestRunner(unittest.TextTestRunner):
1157 A basic test runner implementation which prints results to standard error.
1160 def resultclass(self):
1161 """Class maintaining the results of the tests"""
1162 return VppTestResult
1164 def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
1165 result_pipe=None, failfast=False, buffer=False,
1167 # ignore stream setting here, use hard-coded stdout to be in sync
1168 # with prints from VppTestCase methods ...
1169 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
1170 verbosity, failfast, buffer,
1172 reporter = KeepAliveReporter()
1173 reporter.pipe = keep_alive_pipe
1175 VppTestResult.test_framework_result_pipe = result_pipe
1177 def run(self, test):
1184 faulthandler.enable() # emit stack trace to stderr if killed by signal
1186 result = super(VppTestRunner, self).run(test)
1190 class Worker(Thread):
1191 def __init__(self, args, logger, env={}):
1192 self.logger = logger
1195 self.env = copy.deepcopy(env)
1196 super(Worker, self).__init__()
1199 executable = self.args[0]
1200 self.logger.debug("Running executable w/args `%s'" % self.args)
1201 env = os.environ.copy()
1202 env.update(self.env)
1203 env["CK_LOG_FILE_NAME"] = "-"
1204 self.process = subprocess.Popen(
1205 self.args, shell=False, env=env, preexec_fn=os.setpgrp,
1206 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1207 out, err = self.process.communicate()
1208 self.logger.debug("Finished running `%s'" % executable)
1209 self.logger.info("Return code is `%s'" % self.process.returncode)
1210 self.logger.info(single_line_delim)
1211 self.logger.info("Executable `%s' wrote to stdout:" % executable)
1212 self.logger.info(single_line_delim)
1213 self.logger.info(out)
1214 self.logger.info(single_line_delim)
1215 self.logger.info("Executable `%s' wrote to stderr:" % executable)
1216 self.logger.info(single_line_delim)
1217 self.logger.info(err)
1218 self.logger.info(single_line_delim)
1219 self.result = self.process.returncode