8 from collections import deque
9 from threading import Thread
10 from inspect import getdoc
11 from traceback import format_exception
12 from hook import StepHook, PollHook
13 from vpp_pg_interface import VppPGInterface
14 from vpp_sub_interface import VppSubInterface
15 from vpp_lo_interface import VppLoInterface
16 from vpp_papi_provider import VppPapiProvider
17 from scapy.packet import Raw
18 from logging import FileHandler, DEBUG
20 from vpp_object import VppObjectRegistry
23 Test framework module.
25 The module provides a set of tools for constructing and running tests and
26 representing the results.
30 class _PacketInfo(object):
31 """Private class to create packet info object.
33 Help process information about the next packet.
34 Set variables to default values.
36 #: Store the index of the packet.
38 #: Store the index of the source packet generator interface of the packet.
40 #: Store the index of the destination packet generator interface
43 #: Store the copy of the former packet.
46 def __eq__(self, other):
47 index = self.index == other.index
48 src = self.src == other.src
49 dst = self.dst == other.dst
50 data = self.data == other.data
51 return index and src and dst and data
54 def pump_output(out, deque):
55 for line in iter(out.readline, b''):
59 class VppTestCase(unittest.TestCase):
60 """This subclass is a base class for VPP test cases that are implemented as
61 classes. It provides methods to create and run test case.
65 def packet_infos(self):
66 """List of packet infos"""
67 return self._packet_infos
70 def get_packet_count_for_if_idx(cls, dst_if_index):
71 """Get the number of packet info for specified destination if index"""
72 if dst_if_index in cls._packet_count_for_dst_if_idx:
73 return cls._packet_count_for_dst_if_idx[dst_if_index]
79 """Return the instance of this testcase"""
80 return cls.test_instance
83 def set_debug_flags(cls, d):
84 cls.debug_core = False
86 cls.debug_gdbserver = False
91 if resource.getrlimit(resource.RLIMIT_CORE)[0] <= 0:
92 # give a heads up if this is actually useless
93 print(colorize("WARNING: core size limit is set 0, core files "
94 "will NOT be created", RED))
98 elif dl == "gdbserver":
99 cls.debug_gdbserver = True
101 raise Exception("Unrecognized DEBUG option: '%s'" % d)
104 def setUpConstants(cls):
105 """ Set-up the test case class based on environment variables """
107 s = os.getenv("STEP")
108 cls.step = True if s.lower() in ("y", "yes", "1") else False
112 d = os.getenv("DEBUG")
115 cls.set_debug_flags(d)
116 cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
117 cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
119 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
120 debug_cli = "cli-listen localhost:5002"
121 cls.vpp_cmdline = [cls.vpp_bin,
122 "unix", "{", "nodaemon", debug_cli, "}",
123 "api-segment", "{", "prefix", cls.shm_prefix, "}"]
124 if cls.plugin_path is not None:
125 cls.vpp_cmdline.extend(["plugin_path", cls.plugin_path])
126 cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
129 def wait_for_enter(cls):
130 if cls.debug_gdbserver:
131 print(double_line_delim)
132 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
134 print(double_line_delim)
135 print("Spawned VPP with PID: %d" % cls.vpp.pid)
137 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
139 print(single_line_delim)
140 print("You can debug the VPP using e.g.:")
141 if cls.debug_gdbserver:
142 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
143 print("Now is the time to attach a gdb by running the above "
144 "command, set up breakpoints etc. and then resume VPP from "
145 "within gdb by issuing the 'continue' command")
147 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
148 print("Now is the time to attach a gdb by running the above "
149 "command and set up breakpoints etc.")
150 print(single_line_delim)
151 raw_input("Press ENTER to continue running the testcase...")
155 cmdline = cls.vpp_cmdline
157 if cls.debug_gdbserver:
158 gdbserver = '/usr/bin/gdbserver'
159 if not os.path.isfile(gdbserver) or \
160 not os.access(gdbserver, os.X_OK):
161 raise Exception("gdbserver binary '%s' does not exist or is "
162 "not executable" % gdbserver)
164 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
165 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
168 cls.vpp = subprocess.Popen(cmdline,
169 stdout=subprocess.PIPE,
170 stderr=subprocess.PIPE,
172 except Exception as e:
173 cls.logger.critical("Couldn't start vpp: %s" % e)
181 Perform class setup before running the testcase
182 Remove shared memory files, start vpp and connect the vpp-api
184 cls.logger = getLogger(cls.__name__)
185 cls.tempdir = tempfile.mkdtemp(
186 prefix='vpp-unittest-' + cls.__name__ + '-')
187 file_handler = FileHandler("%s/log.txt" % cls.tempdir)
188 file_handler.setLevel(DEBUG)
189 cls.logger.addHandler(file_handler)
190 cls.shm_prefix = cls.tempdir.split("/")[-1]
191 os.chdir(cls.tempdir)
192 cls.logger.info("Temporary dir is %s, shm prefix is %s",
193 cls.tempdir, cls.shm_prefix)
195 cls.reset_packet_infos()
197 cls._zombie_captures = []
200 cls.registry = VppObjectRegistry()
201 print(double_line_delim)
202 print(colorize(getdoc(cls).splitlines()[0], YELLOW))
203 print(double_line_delim)
204 # need to catch exceptions here because if we raise, then the cleanup
205 # doesn't get called and we might end with a zombie vpp
208 cls.vpp_stdout_deque = deque()
209 cls.vpp_stdout_reader_thread = Thread(target=pump_output, args=(
210 cls.vpp.stdout, cls.vpp_stdout_deque))
211 cls.vpp_stdout_reader_thread.start()
212 cls.vpp_stderr_deque = deque()
213 cls.vpp_stderr_reader_thread = Thread(target=pump_output, args=(
214 cls.vpp.stderr, cls.vpp_stderr_deque))
215 cls.vpp_stderr_reader_thread.start()
216 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls)
221 cls.vapi.register_hook(hook)
227 if cls.debug_gdbserver:
228 print(colorize("You're running VPP inside gdbserver but "
229 "VPP-API connection failed, did you forget "
230 "to 'continue' VPP from within gdb?", RED))
233 t, v, tb = sys.exc_info()
243 Disconnect vpp-api, kill vpp and cleanup shared memory files
245 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
247 if cls.vpp.returncode is None:
248 print(double_line_delim)
249 print("VPP or GDB server is still running")
250 print(single_line_delim)
251 raw_input("When done debugging, press ENTER to kill the "
252 "process and finish running the testcase...")
254 if hasattr(cls, 'vpp'):
255 if hasattr(cls, 'vapi'):
256 cls.vapi.disconnect()
258 if cls.vpp.returncode is None:
262 if hasattr(cls, 'vpp_stdout_deque'):
263 cls.logger.info(single_line_delim)
264 cls.logger.info('VPP output to stdout while running %s:',
266 cls.logger.info(single_line_delim)
267 f = open(cls.tempdir + '/vpp_stdout.txt', 'w')
268 vpp_output = "".join(cls.vpp_stdout_deque)
270 cls.logger.info('\n%s', vpp_output)
271 cls.logger.info(single_line_delim)
273 if hasattr(cls, 'vpp_stderr_deque'):
274 cls.logger.info(single_line_delim)
275 cls.logger.info('VPP output to stderr while running %s:',
277 cls.logger.info(single_line_delim)
278 f = open(cls.tempdir + '/vpp_stderr.txt', 'w')
279 vpp_output = "".join(cls.vpp_stderr_deque)
281 cls.logger.info('\n%s', vpp_output)
282 cls.logger.info(single_line_delim)
285 def tearDownClass(cls):
286 """ Perform final cleanup after running all tests in this test-case """
290 """ Show various debug prints after each test """
291 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
292 (self.__class__.__name__, self._testMethodName,
293 self._testMethodDoc))
294 if not self.vpp_dead:
295 self.logger.debug(self.vapi.cli("show trace"))
296 self.logger.info(self.vapi.ppcli("show int"))
297 self.logger.info(self.vapi.ppcli("show hardware"))
298 self.logger.info(self.vapi.ppcli("show error"))
299 self.logger.info(self.vapi.ppcli("show run"))
300 self.registry.remove_vpp_config(self.logger)
303 """ Clear trace before running each test"""
304 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
305 (self.__class__.__name__, self._testMethodName,
306 self._testMethodDoc))
308 raise Exception("VPP is dead when setting up the test")
310 self.vpp_stdout_deque.append(
311 "--- test setUp() for %s.%s(%s) starts here ---\n" %
312 (self.__class__.__name__, self._testMethodName,
313 self._testMethodDoc))
314 self.vpp_stderr_deque.append(
315 "--- test setUp() for %s.%s(%s) starts here ---\n" %
316 (self.__class__.__name__, self._testMethodName,
317 self._testMethodDoc))
318 self.vapi.cli("clear trace")
319 # store the test instance inside the test class - so that objects
320 # holding the class can access instance methods (like assertEqual)
321 type(self).test_instance = self
324 def pg_enable_capture(cls, interfaces):
326 Enable capture on packet-generator interfaces
328 :param interfaces: iterable interface indexes
335 def register_capture(cls, cap_name):
336 """ Register a capture in the testclass """
337 # add to the list of captures with current timestamp
338 cls._captures.append((time.time(), cap_name))
339 # filter out from zombies
340 cls._zombie_captures = [(stamp, name)
341 for (stamp, name) in cls._zombie_captures
346 """ Remove any zombie captures and enable the packet generator """
347 # how long before capture is allowed to be deleted - otherwise vpp
348 # crashes - 100ms seems enough (this shouldn't be needed at all)
351 for stamp, cap_name in cls._zombie_captures:
352 wait = stamp + capture_ttl - now
354 cls.logger.debug("Waiting for %ss before deleting capture %s",
358 cls.logger.debug("Removing zombie capture %s" % cap_name)
359 cls.vapi.cli('packet-generator delete %s' % cap_name)
361 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
362 cls.vapi.cli('packet-generator enable')
363 cls._zombie_captures = cls._captures
367 def create_pg_interfaces(cls, interfaces):
369 Create packet-generator interfaces.
371 :param interfaces: iterable indexes of the interfaces.
372 :returns: List of created interfaces.
377 intf = VppPGInterface(cls, i)
378 setattr(cls, intf.name, intf)
380 cls.pg_interfaces = result
384 def create_loopback_interfaces(cls, interfaces):
386 Create loopback interfaces.
388 :param interfaces: iterable indexes of the interfaces.
389 :returns: List of created interfaces.
393 intf = VppLoInterface(cls, i)
394 setattr(cls, intf.name, intf)
396 cls.lo_interfaces = result
400 def extend_packet(packet, size):
402 Extend packet to given size by padding with spaces
403 NOTE: Currently works only when Raw layer is present.
405 :param packet: packet
406 :param size: target size
409 packet_len = len(packet) + 4
410 extend = size - packet_len
412 packet[Raw].load += ' ' * extend
415 def reset_packet_infos(cls):
416 """ Reset the list of packet info objects and packet counts to zero """
417 cls._packet_infos = {}
418 cls._packet_count_for_dst_if_idx = {}
421 def create_packet_info(cls, src_if, dst_if):
423 Create packet info object containing the source and destination indexes
424 and add it to the testcase's packet info list
426 :param VppInterface src_if: source interface
427 :param VppInterface dst_if: destination interface
429 :returns: _PacketInfo object
433 info.index = len(cls._packet_infos)
434 info.src = src_if.sw_if_index
435 info.dst = dst_if.sw_if_index
436 if isinstance(dst_if, VppSubInterface):
437 dst_idx = dst_if.parent.sw_if_index
439 dst_idx = dst_if.sw_if_index
440 if dst_idx in cls._packet_count_for_dst_if_idx:
441 cls._packet_count_for_dst_if_idx[dst_idx] += 1
443 cls._packet_count_for_dst_if_idx[dst_idx] = 1
444 cls._packet_infos[info.index] = info
448 def info_to_payload(info):
450 Convert _PacketInfo object to packet payload
452 :param info: _PacketInfo object
454 :returns: string containing serialized data from packet info
456 return "%d %d %d" % (info.index, info.src, info.dst)
459 def payload_to_info(payload):
461 Convert packet payload to _PacketInfo object
463 :param payload: packet payload
465 :returns: _PacketInfo object containing de-serialized data from payload
468 numbers = payload.split()
470 info.index = int(numbers[0])
471 info.src = int(numbers[1])
472 info.dst = int(numbers[2])
475 def get_next_packet_info(self, info):
477 Iterate over the packet info list stored in the testcase
478 Start iteration with first element if info is None
479 Continue based on index in info if info is specified
481 :param info: info or None
482 :returns: next info in list or None if no more infos
487 next_index = info.index + 1
488 if next_index == len(self._packet_infos):
491 return self._packet_infos[next_index]
493 def get_next_packet_info_for_interface(self, src_index, info):
495 Search the packet info list for the next packet info with same source
498 :param src_index: source interface index to search for
499 :param info: packet info - where to start the search
500 :returns: packet info or None
504 info = self.get_next_packet_info(info)
507 if info.src == src_index:
510 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
512 Search the packet info list for the next packet info with same source
513 and destination interface indexes
515 :param src_index: source interface index to search for
516 :param dst_index: destination interface index to search for
517 :param info: packet info - where to start the search
518 :returns: packet info or None
522 info = self.get_next_packet_info_for_interface(src_index, info)
525 if info.dst == dst_index:
528 def assert_equal(self, real_value, expected_value, name_or_class=None):
529 if name_or_class is None:
530 self.assertEqual(real_value, expected_value, msg)
533 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
534 msg = msg % (getdoc(name_or_class).strip(),
535 real_value, str(name_or_class(real_value)),
536 expected_value, str(name_or_class(expected_value)))
538 msg = "Invalid %s: %s does not match expected value %s" % (
539 name_or_class, real_value, expected_value)
541 self.assertEqual(real_value, expected_value, msg)
543 def assert_in_range(self,
551 msg = "Invalid %s: %s out of range <%s,%s>" % (
552 name, real_value, expected_min, expected_max)
553 self.assertTrue(expected_min <= real_value <= expected_max, msg)
555 def sleep(self, timeout):
556 self.logger.debug("Sleeping for %ss" % timeout)
560 class VppTestResult(unittest.TestResult):
562 @property result_string
563 String variable to store the test case result string.
565 List variable containing 2-tuples of TestCase instances and strings
566 holding formatted tracebacks. Each tuple represents a test which
567 raised an unexpected exception.
569 List variable containing 2-tuples of TestCase instances and strings
570 holding formatted tracebacks. Each tuple represents a test where
571 a failure was explicitly signalled using the TestCase.assert*()
575 def __init__(self, stream, descriptions, verbosity):
577 :param stream File descriptor to store where to report test results.
578 Set to the standard error stream by default.
579 :param descriptions Boolean variable to store information if to use
580 test case descriptions.
581 :param verbosity Integer variable to store required verbosity level.
583 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
585 self.descriptions = descriptions
586 self.verbosity = verbosity
587 self.result_string = None
589 def addSuccess(self, test):
591 Record a test succeeded result
596 if hasattr(test, 'logger'):
597 test.logger.debug("--- addSuccess() %s.%s(%s) called"
598 % (test.__class__.__name__,
599 test._testMethodName,
600 test._testMethodDoc))
601 unittest.TestResult.addSuccess(self, test)
602 self.result_string = colorize("OK", GREEN)
604 def addSkip(self, test, reason):
606 Record a test skipped.
612 if hasattr(test, 'logger'):
613 test.logger.debug("--- addSkip() %s.%s(%s) called, reason is %s"
614 % (test.__class__.__name__,
615 test._testMethodName,
618 unittest.TestResult.addSkip(self, test, reason)
619 self.result_string = colorize("SKIP", YELLOW)
621 def addFailure(self, test, err):
623 Record a test failed result
626 :param err: error message
629 if hasattr(test, 'logger'):
630 test.logger.debug("--- addFailure() %s.%s(%s) called, err is %s"
631 % (test.__class__.__name__,
632 test._testMethodName,
633 test._testMethodDoc, err))
634 test.logger.debug("formatted exception is:\n%s" %
635 "".join(format_exception(*err)))
636 unittest.TestResult.addFailure(self, test, err)
637 if hasattr(test, 'tempdir'):
638 self.result_string = colorize("FAIL", RED) + \
639 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
641 self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
643 def addError(self, test, err):
645 Record a test error result
648 :param err: error message
651 if hasattr(test, 'logger'):
652 test.logger.debug("--- addError() %s.%s(%s) called, err is %s"
653 % (test.__class__.__name__,
654 test._testMethodName,
655 test._testMethodDoc, err))
656 test.logger.debug("formatted exception is:\n%s" %
657 "".join(format_exception(*err)))
658 unittest.TestResult.addError(self, test, err)
659 if hasattr(test, 'tempdir'):
660 self.result_string = colorize("ERROR", RED) + \
661 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
663 self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
665 def getDescription(self, test):
670 :returns: test description
673 # TODO: if none print warning not raise exception
674 short_description = test.shortDescription()
675 if self.descriptions and short_description:
676 return short_description
680 def startTest(self, test):
687 unittest.TestResult.startTest(self, test)
688 if self.verbosity > 0:
690 "Starting " + self.getDescription(test) + " ...")
691 self.stream.writeln(single_line_delim)
693 def stopTest(self, test):
700 unittest.TestResult.stopTest(self, test)
701 if self.verbosity > 0:
702 self.stream.writeln(single_line_delim)
703 self.stream.writeln("%-73s%s" % (self.getDescription(test),
705 self.stream.writeln(single_line_delim)
707 self.stream.writeln("%-73s%s" % (self.getDescription(test),
710 def printErrors(self):
712 Print errors from running the test case
714 self.stream.writeln()
715 self.printErrorList('ERROR', self.errors)
716 self.printErrorList('FAIL', self.failures)
718 def printErrorList(self, flavour, errors):
720 Print error list to the output stream together with error type
721 and test case description.
723 :param flavour: error type
724 :param errors: iterable errors
727 for test, err in errors:
728 self.stream.writeln(double_line_delim)
729 self.stream.writeln("%s: %s" %
730 (flavour, self.getDescription(test)))
731 self.stream.writeln(single_line_delim)
732 self.stream.writeln("%s" % err)
735 class VppTestRunner(unittest.TextTestRunner):
737 A basic test runner implementation which prints results to standard error.
740 def resultclass(self):
741 """Class maintaining the results of the tests"""
744 def __init__(self, stream=sys.stderr, descriptions=True, verbosity=1,
745 failfast=False, buffer=False, resultclass=None):
746 # ignore stream setting here, use hard-coded stdout to be in sync
747 # with prints from VppTestCase methods ...
748 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
749 verbosity, failfast, buffer,
754 def parse_test_option(self):
756 f = os.getenv(self.test_option)
759 filter_file_name = None
760 filter_class_name = None
761 filter_func_name = None
766 raise Exception("Unrecognized %s option: %s" %
767 (self.test_option, f))
769 if parts[2] not in ('*', ''):
770 filter_func_name = parts[2]
771 if parts[1] not in ('*', ''):
772 filter_class_name = parts[1]
773 if parts[0] not in ('*', ''):
774 if parts[0].startswith('test_'):
775 filter_file_name = parts[0]
777 filter_file_name = 'test_%s' % parts[0]
779 if f.startswith('test_'):
782 filter_file_name = 'test_%s' % f
783 return filter_file_name, filter_class_name, filter_func_name
785 def filter_tests(self, tests, filter_file, filter_class, filter_func):
786 result = unittest.suite.TestSuite()
788 if isinstance(t, unittest.suite.TestSuite):
789 # this is a bunch of tests, recursively filter...
790 x = self.filter_tests(t, filter_file, filter_class,
792 if x.countTestCases() > 0:
794 elif isinstance(t, unittest.TestCase):
795 # this is a single test
796 parts = t.id().split('.')
797 # t.id() for common cases like this:
798 # test_classifier.TestClassifier.test_acl_ip
799 # apply filtering only if it is so
801 if filter_file and filter_file != parts[0]:
803 if filter_class and filter_class != parts[1]:
805 if filter_func and filter_func != parts[2]:
809 # unexpected object, don't touch it
820 print("Running tests using custom test runner") # debug message
821 filter_file, filter_class, filter_func = self.parse_test_option()
822 print("Active filters: file=%s, class=%s, function=%s" % (
823 filter_file, filter_class, filter_func))
824 filtered = self.filter_tests(test, filter_file, filter_class,
826 print("%s out of %s tests match specified filters" % (
827 filtered.countTestCases(), test.countTestCases()))
828 return super(VppTestRunner, self).run(filtered)