8 from collections import deque
9 from threading import Thread
10 from inspect import getdoc
11 from hook import StepHook, PollHook
12 from vpp_pg_interface import VppPGInterface
13 from vpp_sub_interface import VppSubInterface
14 from vpp_lo_interface import VppLoInterface
15 from vpp_papi_provider import VppPapiProvider
16 from scapy.packet import Raw
17 from logging import FileHandler, DEBUG
19 from vpp_object import VppObjectRegistry
22 Test framework module.
24 The module provides a set of tools for constructing and running tests and
25 representing the results.
29 class _PacketInfo(object):
30 """Private class to create packet info object.
32 Help process information about the next packet.
33 Set variables to default values.
35 #: Store the index of the packet.
37 #: Store the index of the source packet generator interface of the packet.
39 #: Store the index of the destination packet generator interface
42 #: Store the copy of the former packet.
45 def __eq__(self, other):
46 index = self.index == other.index
47 src = self.src == other.src
48 dst = self.dst == other.dst
49 data = self.data == other.data
50 return index and src and dst and data
53 def pump_output(out, deque):
54 for line in iter(out.readline, b''):
58 class VppTestCase(unittest.TestCase):
59 """This subclass is a base class for VPP test cases that are implemented as
60 classes. It provides methods to create and run test case.
64 def packet_infos(self):
65 """List of packet infos"""
66 return self._packet_infos
69 def get_packet_count_for_if_idx(cls, dst_if_index):
70 """Get the number of packet info for specified destination if index"""
71 if dst_if_index in cls._packet_count_for_dst_if_idx:
72 return cls._packet_count_for_dst_if_idx[dst_if_index]
78 """Return the instance of this testcase"""
79 return cls.test_instance
82 def set_debug_flags(cls, d):
83 cls.debug_core = False
85 cls.debug_gdbserver = False
90 if resource.getrlimit(resource.RLIMIT_CORE)[0] <= 0:
91 # give a heads up if this is actually useless
92 print(colorize("WARNING: core size limit is set 0, core files "
93 "will NOT be created", RED))
97 elif dl == "gdbserver":
98 cls.debug_gdbserver = True
100 raise Exception("Unrecognized DEBUG option: '%s'" % d)
103 def setUpConstants(cls):
104 """ Set-up the test case class based on environment variables """
106 s = os.getenv("STEP")
107 cls.step = True if s.lower() in ("y", "yes", "1") else False
111 d = os.getenv("DEBUG")
114 cls.set_debug_flags(d)
115 cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
116 cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
118 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
119 debug_cli = "cli-listen localhost:5002"
120 cls.vpp_cmdline = [cls.vpp_bin,
121 "unix", "{", "nodaemon", debug_cli, "}",
122 "api-segment", "{", "prefix", cls.shm_prefix, "}"]
123 if cls.plugin_path is not None:
124 cls.vpp_cmdline.extend(["plugin_path", cls.plugin_path])
125 cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
128 def wait_for_enter(cls):
129 if cls.debug_gdbserver:
130 print(double_line_delim)
131 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
133 print(double_line_delim)
134 print("Spawned VPP with PID: %d" % cls.vpp.pid)
136 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
138 print(single_line_delim)
139 print("You can debug the VPP using e.g.:")
140 if cls.debug_gdbserver:
141 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
142 print("Now is the time to attach a gdb by running the above "
143 "command, set up breakpoints etc. and then resume VPP from "
144 "within gdb by issuing the 'continue' command")
146 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
147 print("Now is the time to attach a gdb by running the above "
148 "command and set up breakpoints etc.")
149 print(single_line_delim)
150 raw_input("Press ENTER to continue running the testcase...")
154 cmdline = cls.vpp_cmdline
156 if cls.debug_gdbserver:
157 gdbserver = '/usr/bin/gdbserver'
158 if not os.path.isfile(gdbserver) or \
159 not os.access(gdbserver, os.X_OK):
160 raise Exception("gdbserver binary '%s' does not exist or is "
161 "not executable" % gdbserver)
163 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
164 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
167 cls.vpp = subprocess.Popen(cmdline,
168 stdout=subprocess.PIPE,
169 stderr=subprocess.PIPE,
171 except Exception as e:
172 cls.logger.critical("Couldn't start vpp: %s" % e)
180 Perform class setup before running the testcase
181 Remove shared memory files, start vpp and connect the vpp-api
183 cls.logger = getLogger(cls.__name__)
184 cls.tempdir = tempfile.mkdtemp(
185 prefix='vpp-unittest-' + cls.__name__ + '-')
186 file_handler = FileHandler("%s/log.txt" % cls.tempdir)
187 file_handler.setLevel(DEBUG)
188 cls.logger.addHandler(file_handler)
189 cls.shm_prefix = cls.tempdir.split("/")[-1]
190 os.chdir(cls.tempdir)
191 cls.logger.info("Temporary dir is %s, shm prefix is %s",
192 cls.tempdir, cls.shm_prefix)
194 cls.reset_packet_infos()
196 cls._zombie_captures = []
199 cls.registry = VppObjectRegistry()
200 print(double_line_delim)
201 print(colorize(getdoc(cls).splitlines()[0], YELLOW))
202 print(double_line_delim)
203 # need to catch exceptions here because if we raise, then the cleanup
204 # doesn't get called and we might end with a zombie vpp
207 cls.vpp_stdout_deque = deque()
208 cls.vpp_stdout_reader_thread = Thread(target=pump_output, args=(
209 cls.vpp.stdout, cls.vpp_stdout_deque))
210 cls.vpp_stdout_reader_thread.start()
211 cls.vpp_stderr_deque = deque()
212 cls.vpp_stderr_reader_thread = Thread(target=pump_output, args=(
213 cls.vpp.stderr, cls.vpp_stderr_deque))
214 cls.vpp_stderr_reader_thread.start()
215 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls)
220 cls.vapi.register_hook(hook)
226 if cls.debug_gdbserver:
227 print(colorize("You're running VPP inside gdbserver but "
228 "VPP-API connection failed, did you forget "
229 "to 'continue' VPP from within gdb?", RED))
232 t, v, tb = sys.exc_info()
242 Disconnect vpp-api, kill vpp and cleanup shared memory files
244 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
246 if cls.vpp.returncode is None:
247 print(double_line_delim)
248 print("VPP or GDB server is still running")
249 print(single_line_delim)
250 raw_input("When done debugging, press ENTER to kill the "
251 "process and finish running the testcase...")
253 if hasattr(cls, 'vpp'):
254 if hasattr(cls, 'vapi'):
255 cls.vapi.disconnect()
257 if cls.vpp.returncode is None:
261 if hasattr(cls, 'vpp_stdout_deque'):
262 cls.logger.info(single_line_delim)
263 cls.logger.info('VPP output to stdout while running %s:',
265 cls.logger.info(single_line_delim)
266 f = open(cls.tempdir + '/vpp_stdout.txt', 'w')
267 vpp_output = "".join(cls.vpp_stdout_deque)
269 cls.logger.info('\n%s', vpp_output)
270 cls.logger.info(single_line_delim)
272 if hasattr(cls, 'vpp_stderr_deque'):
273 cls.logger.info(single_line_delim)
274 cls.logger.info('VPP output to stderr while running %s:',
276 cls.logger.info(single_line_delim)
277 f = open(cls.tempdir + '/vpp_stderr.txt', 'w')
278 vpp_output = "".join(cls.vpp_stderr_deque)
280 cls.logger.info('\n%s', vpp_output)
281 cls.logger.info(single_line_delim)
284 def tearDownClass(cls):
285 """ Perform final cleanup after running all tests in this test-case """
289 """ Show various debug prints after each test """
290 if not self.vpp_dead:
291 self.logger.debug(self.vapi.cli("show trace"))
292 self.logger.info(self.vapi.ppcli("show int"))
293 self.logger.info(self.vapi.ppcli("show hardware"))
294 self.logger.info(self.vapi.ppcli("show error"))
295 self.logger.info(self.vapi.ppcli("show run"))
296 self.registry.remove_vpp_config(self.logger)
299 """ Clear trace before running each test"""
301 raise Exception("VPP is dead when setting up the test")
303 self.vpp_stdout_deque.append(
304 "--- test setUp() for %s.%s(%s) starts here ---\n" %
305 (self.__class__.__name__, self._testMethodName,
306 self._testMethodDoc))
307 self.vpp_stderr_deque.append(
308 "--- test setUp() for %s.%s(%s) starts here ---\n" %
309 (self.__class__.__name__, self._testMethodName,
310 self._testMethodDoc))
311 self.vapi.cli("clear trace")
312 # store the test instance inside the test class - so that objects
313 # holding the class can access instance methods (like assertEqual)
314 type(self).test_instance = self
317 def pg_enable_capture(cls, interfaces):
319 Enable capture on packet-generator interfaces
321 :param interfaces: iterable interface indexes
328 def register_capture(cls, cap_name):
329 """ Register a capture in the testclass """
330 # add to the list of captures with current timestamp
331 cls._captures.append((time.time(), cap_name))
332 # filter out from zombies
333 cls._zombie_captures = [(stamp, name)
334 for (stamp, name) in cls._zombie_captures
339 """ Remove any zombie captures and enable the packet generator """
340 # how long before capture is allowed to be deleted - otherwise vpp
341 # crashes - 100ms seems enough (this shouldn't be needed at all)
344 for stamp, cap_name in cls._zombie_captures:
345 wait = stamp + capture_ttl - now
347 cls.logger.debug("Waiting for %ss before deleting capture %s",
351 cls.logger.debug("Removing zombie capture %s" % cap_name)
352 cls.vapi.cli('packet-generator delete %s' % cap_name)
354 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
355 cls.vapi.cli('packet-generator enable')
356 cls._zombie_captures = cls._captures
360 def create_pg_interfaces(cls, interfaces):
362 Create packet-generator interfaces.
364 :param interfaces: iterable indexes of the interfaces.
365 :returns: List of created interfaces.
370 intf = VppPGInterface(cls, i)
371 setattr(cls, intf.name, intf)
373 cls.pg_interfaces = result
377 def create_loopback_interfaces(cls, interfaces):
379 Create loopback interfaces.
381 :param interfaces: iterable indexes of the interfaces.
382 :returns: List of created interfaces.
386 intf = VppLoInterface(cls, i)
387 setattr(cls, intf.name, intf)
389 cls.lo_interfaces = result
393 def extend_packet(packet, size):
395 Extend packet to given size by padding with spaces
396 NOTE: Currently works only when Raw layer is present.
398 :param packet: packet
399 :param size: target size
402 packet_len = len(packet) + 4
403 extend = size - packet_len
405 packet[Raw].load += ' ' * extend
408 def reset_packet_infos(cls):
409 """ Reset the list of packet info objects and packet counts to zero """
410 cls._packet_infos = {}
411 cls._packet_count_for_dst_if_idx = {}
414 def create_packet_info(cls, src_if, dst_if):
416 Create packet info object containing the source and destination indexes
417 and add it to the testcase's packet info list
419 :param VppInterface src_if: source interface
420 :param VppInterface dst_if: destination interface
422 :returns: _PacketInfo object
426 info.index = len(cls._packet_infos)
427 info.src = src_if.sw_if_index
428 info.dst = dst_if.sw_if_index
429 if isinstance(dst_if, VppSubInterface):
430 dst_idx = dst_if.parent.sw_if_index
432 dst_idx = dst_if.sw_if_index
433 if dst_idx in cls._packet_count_for_dst_if_idx:
434 cls._packet_count_for_dst_if_idx[dst_idx] += 1
436 cls._packet_count_for_dst_if_idx[dst_idx] = 1
437 cls._packet_infos[info.index] = info
441 def info_to_payload(info):
443 Convert _PacketInfo object to packet payload
445 :param info: _PacketInfo object
447 :returns: string containing serialized data from packet info
449 return "%d %d %d" % (info.index, info.src, info.dst)
452 def payload_to_info(payload):
454 Convert packet payload to _PacketInfo object
456 :param payload: packet payload
458 :returns: _PacketInfo object containing de-serialized data from payload
461 numbers = payload.split()
463 info.index = int(numbers[0])
464 info.src = int(numbers[1])
465 info.dst = int(numbers[2])
468 def get_next_packet_info(self, info):
470 Iterate over the packet info list stored in the testcase
471 Start iteration with first element if info is None
472 Continue based on index in info if info is specified
474 :param info: info or None
475 :returns: next info in list or None if no more infos
480 next_index = info.index + 1
481 if next_index == len(self._packet_infos):
484 return self._packet_infos[next_index]
486 def get_next_packet_info_for_interface(self, src_index, info):
488 Search the packet info list for the next packet info with same source
491 :param src_index: source interface index to search for
492 :param info: packet info - where to start the search
493 :returns: packet info or None
497 info = self.get_next_packet_info(info)
500 if info.src == src_index:
503 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
505 Search the packet info list for the next packet info with same source
506 and destination interface indexes
508 :param src_index: source interface index to search for
509 :param dst_index: destination interface index to search for
510 :param info: packet info - where to start the search
511 :returns: packet info or None
515 info = self.get_next_packet_info_for_interface(src_index, info)
518 if info.dst == dst_index:
521 def assert_equal(self, real_value, expected_value, name_or_class=None):
522 if name_or_class is None:
523 self.assertEqual(real_value, expected_value, msg)
526 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
527 msg = msg % (getdoc(name_or_class).strip(),
528 real_value, str(name_or_class(real_value)),
529 expected_value, str(name_or_class(expected_value)))
531 msg = "Invalid %s: %s does not match expected value %s" % (
532 name_or_class, real_value, expected_value)
534 self.assertEqual(real_value, expected_value, msg)
536 def assert_in_range(self,
544 msg = "Invalid %s: %s out of range <%s,%s>" % (
545 name, real_value, expected_min, expected_max)
546 self.assertTrue(expected_min <= real_value <= expected_max, msg)
549 class VppTestResult(unittest.TestResult):
551 @property result_string
552 String variable to store the test case result string.
554 List variable containing 2-tuples of TestCase instances and strings
555 holding formatted tracebacks. Each tuple represents a test which
556 raised an unexpected exception.
558 List variable containing 2-tuples of TestCase instances and strings
559 holding formatted tracebacks. Each tuple represents a test where
560 a failure was explicitly signalled using the TestCase.assert*()
564 def __init__(self, stream, descriptions, verbosity):
566 :param stream File descriptor to store where to report test results.
567 Set to the standard error stream by default.
568 :param descriptions Boolean variable to store information if to use
569 test case descriptions.
570 :param verbosity Integer variable to store required verbosity level.
572 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
574 self.descriptions = descriptions
575 self.verbosity = verbosity
576 self.result_string = None
578 def addSuccess(self, test):
580 Record a test succeeded result
585 unittest.TestResult.addSuccess(self, test)
586 self.result_string = colorize("OK", GREEN)
588 def addSkip(self, test, reason):
590 Record a test skipped.
596 unittest.TestResult.addSkip(self, test, reason)
597 self.result_string = colorize("SKIP", YELLOW)
599 def addFailure(self, test, err):
601 Record a test failed result
604 :param err: error message
607 unittest.TestResult.addFailure(self, test, err)
608 if hasattr(test, 'tempdir'):
609 self.result_string = colorize("FAIL", RED) + \
610 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
612 self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
614 def addError(self, test, err):
616 Record a test error result
619 :param err: error message
622 unittest.TestResult.addError(self, test, err)
623 if hasattr(test, 'tempdir'):
624 self.result_string = colorize("ERROR", RED) + \
625 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
627 self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
629 def getDescription(self, test):
634 :returns: test description
637 # TODO: if none print warning not raise exception
638 short_description = test.shortDescription()
639 if self.descriptions and short_description:
640 return short_description
644 def startTest(self, test):
651 unittest.TestResult.startTest(self, test)
652 if self.verbosity > 0:
654 "Starting " + self.getDescription(test) + " ...")
655 self.stream.writeln(single_line_delim)
657 def stopTest(self, test):
664 unittest.TestResult.stopTest(self, test)
665 if self.verbosity > 0:
666 self.stream.writeln(single_line_delim)
667 self.stream.writeln("%-73s%s" % (self.getDescription(test),
669 self.stream.writeln(single_line_delim)
671 self.stream.writeln("%-73s%s" % (self.getDescription(test),
674 def printErrors(self):
676 Print errors from running the test case
678 self.stream.writeln()
679 self.printErrorList('ERROR', self.errors)
680 self.printErrorList('FAIL', self.failures)
682 def printErrorList(self, flavour, errors):
684 Print error list to the output stream together with error type
685 and test case description.
687 :param flavour: error type
688 :param errors: iterable errors
691 for test, err in errors:
692 self.stream.writeln(double_line_delim)
693 self.stream.writeln("%s: %s" %
694 (flavour, self.getDescription(test)))
695 self.stream.writeln(single_line_delim)
696 self.stream.writeln("%s" % err)
699 class VppTestRunner(unittest.TextTestRunner):
701 A basic test runner implementation which prints results to standard error.
704 def resultclass(self):
705 """Class maintaining the results of the tests"""
708 def __init__(self, stream=sys.stderr, descriptions=True, verbosity=1,
709 failfast=False, buffer=False, resultclass=None):
710 # ignore stream setting here, use hard-coded stdout to be in sync
711 # with prints from VppTestCase methods ...
712 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
713 verbosity, failfast, buffer,
718 def parse_test_option(self):
720 f = os.getenv(self.test_option)
723 filter_file_name = None
724 filter_class_name = None
725 filter_func_name = None
730 raise Exception("Unrecognized %s option: %s" %
731 (self.test_option, f))
733 if parts[2] not in ('*', ''):
734 filter_func_name = parts[2]
735 if parts[1] not in ('*', ''):
736 filter_class_name = parts[1]
737 if parts[0] not in ('*', ''):
738 if parts[0].startswith('test_'):
739 filter_file_name = parts[0]
741 filter_file_name = 'test_%s' % parts[0]
743 if f.startswith('test_'):
746 filter_file_name = 'test_%s' % f
747 return filter_file_name, filter_class_name, filter_func_name
749 def filter_tests(self, tests, filter_file, filter_class, filter_func):
750 result = unittest.suite.TestSuite()
752 if isinstance(t, unittest.suite.TestSuite):
753 # this is a bunch of tests, recursively filter...
754 x = self.filter_tests(t, filter_file, filter_class,
756 if x.countTestCases() > 0:
758 elif isinstance(t, unittest.TestCase):
759 # this is a single test
760 parts = t.id().split('.')
761 # t.id() for common cases like this:
762 # test_classifier.TestClassifier.test_acl_ip
763 # apply filtering only if it is so
765 if filter_file and filter_file != parts[0]:
767 if filter_class and filter_class != parts[1]:
769 if filter_func and filter_func != parts[2]:
773 # unexpected object, don't touch it
784 print("Running tests using custom test runner") # debug message
785 filter_file, filter_class, filter_func = self.parse_test_option()
786 print("Active filters: file=%s, class=%s, function=%s" % (
787 filter_file, filter_class, filter_func))
788 filtered = self.filter_tests(test, filter_file, filter_class,
790 print("%s out of %s tests match specified filters" % (
791 filtered.countTestCases(), test.countTestCases()))
792 return super(VppTestRunner, self).run(filtered)