3 from __future__ import print_function
12 from collections import deque
13 from threading import Thread, Event
14 from inspect import getdoc
15 from traceback import format_exception
16 from logging import FileHandler, DEBUG, Formatter
17 from scapy.packet import Raw
18 from hook import StepHook, PollHook
19 from vpp_pg_interface import VppPGInterface
20 from vpp_sub_interface import VppSubInterface
21 from vpp_lo_interface import VppLoInterface
22 from vpp_papi_provider import VppPapiProvider
24 from vpp_object import VppObjectRegistry
25 if os.name == 'posix' and sys.version_info[0] < 3:
26 # using subprocess32 is recommended by python official documentation
27 # @ https://docs.python.org/2/library/subprocess.html
28 import subprocess32 as subprocess
33 Test framework module.
35 The module provides a set of tools for constructing and running tests and
36 representing the results.
40 class _PacketInfo(object):
41 """Private class to create packet info object.
43 Help process information about the next packet.
44 Set variables to default values.
46 #: Store the index of the packet.
48 #: Store the index of the source packet generator interface of the packet.
50 #: Store the index of the destination packet generator interface
53 #: Store the copy of the former packet.
56 def __eq__(self, other):
57 index = self.index == other.index
58 src = self.src == other.src
59 dst = self.dst == other.dst
60 data = self.data == other.data
61 return index and src and dst and data
64 def pump_output(testclass):
65 """ pump output from vpp stdout/stderr to proper queues """
66 while not testclass.pump_thread_stop_flag.wait(0):
67 readable = select.select([testclass.vpp.stdout.fileno(),
68 testclass.vpp.stderr.fileno(),
69 testclass.pump_thread_wakeup_pipe[0]],
71 if testclass.vpp.stdout.fileno() in readable:
72 read = os.read(testclass.vpp.stdout.fileno(), 1024)
73 testclass.vpp_stdout_deque.append(read)
74 if testclass.vpp.stderr.fileno() in readable:
75 read = os.read(testclass.vpp.stderr.fileno(), 1024)
76 testclass.vpp_stderr_deque.append(read)
77 # ignoring the dummy pipe here intentionally - the flag will take care
78 # of properly terminating the loop
81 class VppTestCase(unittest.TestCase):
82 """This subclass is a base class for VPP test cases that are implemented as
83 classes. It provides methods to create and run test case.
87 def packet_infos(self):
88 """List of packet infos"""
89 return self._packet_infos
92 def get_packet_count_for_if_idx(cls, dst_if_index):
93 """Get the number of packet info for specified destination if index"""
94 if dst_if_index in cls._packet_count_for_dst_if_idx:
95 return cls._packet_count_for_dst_if_idx[dst_if_index]
101 """Return the instance of this testcase"""
102 return cls.test_instance
105 def set_debug_flags(cls, d):
106 cls.debug_core = False
107 cls.debug_gdb = False
108 cls.debug_gdbserver = False
113 if resource.getrlimit(resource.RLIMIT_CORE)[0] <= 0:
114 # give a heads up if this is actually useless
115 print(colorize("WARNING: core size limit is set 0, core files "
116 "will NOT be created", RED))
117 cls.debug_core = True
120 elif dl == "gdbserver":
121 cls.debug_gdbserver = True
123 raise Exception("Unrecognized DEBUG option: '%s'" % d)
126 def setUpConstants(cls):
127 """ Set-up the test case class based on environment variables """
129 s = os.getenv("STEP")
130 cls.step = True if s.lower() in ("y", "yes", "1") else False
134 d = os.getenv("DEBUG")
137 cls.set_debug_flags(d)
138 cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
139 cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
141 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
142 debug_cli = "cli-listen localhost:5002"
143 cls.vpp_cmdline = [cls.vpp_bin,
144 "unix", "{", "nodaemon", debug_cli, "}",
145 "api-segment", "{", "prefix", cls.shm_prefix, "}"]
146 if cls.plugin_path is not None:
147 cls.vpp_cmdline.extend(["plugin_path", cls.plugin_path])
148 cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
151 def wait_for_enter(cls):
152 if cls.debug_gdbserver:
153 print(double_line_delim)
154 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
156 print(double_line_delim)
157 print("Spawned VPP with PID: %d" % cls.vpp.pid)
159 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
161 print(single_line_delim)
162 print("You can debug the VPP using e.g.:")
163 if cls.debug_gdbserver:
164 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
165 print("Now is the time to attach a gdb by running the above "
166 "command, set up breakpoints etc. and then resume VPP from "
167 "within gdb by issuing the 'continue' command")
169 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
170 print("Now is the time to attach a gdb by running the above "
171 "command and set up breakpoints etc.")
172 print(single_line_delim)
173 raw_input("Press ENTER to continue running the testcase...")
177 cmdline = cls.vpp_cmdline
179 if cls.debug_gdbserver:
180 gdbserver = '/usr/bin/gdbserver'
181 if not os.path.isfile(gdbserver) or \
182 not os.access(gdbserver, os.X_OK):
183 raise Exception("gdbserver binary '%s' does not exist or is "
184 "not executable" % gdbserver)
186 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
187 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
190 cls.vpp = subprocess.Popen(cmdline,
191 stdout=subprocess.PIPE,
192 stderr=subprocess.PIPE,
194 except Exception as e:
195 cls.logger.critical("Couldn't start vpp: %s" % e)
203 Perform class setup before running the testcase
204 Remove shared memory files, start vpp and connect the vpp-api
206 gc.collect() # run garbage collection first
207 cls.logger = getLogger(cls.__name__)
208 cls.tempdir = tempfile.mkdtemp(
209 prefix='vpp-unittest-' + cls.__name__ + '-')
210 file_handler = FileHandler("%s/log.txt" % cls.tempdir)
211 file_handler.setFormatter(
212 Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
214 file_handler.setLevel(DEBUG)
215 cls.logger.addHandler(file_handler)
216 cls.shm_prefix = cls.tempdir.split("/")[-1]
217 os.chdir(cls.tempdir)
218 cls.logger.info("Temporary dir is %s, shm prefix is %s",
219 cls.tempdir, cls.shm_prefix)
221 cls.reset_packet_infos()
223 cls._zombie_captures = []
226 cls.registry = VppObjectRegistry()
227 print(double_line_delim)
228 print(colorize(getdoc(cls).splitlines()[0], YELLOW))
229 print(double_line_delim)
230 # need to catch exceptions here because if we raise, then the cleanup
231 # doesn't get called and we might end with a zombie vpp
234 cls.vpp_stdout_deque = deque()
235 cls.vpp_stderr_deque = deque()
236 cls.pump_thread_stop_flag = Event()
237 cls.pump_thread_wakeup_pipe = os.pipe()
238 cls.pump_thread = Thread(target=pump_output, args=(cls,))
239 cls.pump_thread.start()
240 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls)
245 cls.vapi.register_hook(hook)
246 cls.sleep(0.1, "after vpp startup, before initial poll")
251 if cls.debug_gdbserver:
252 print(colorize("You're running VPP inside gdbserver but "
253 "VPP-API connection failed, did you forget "
254 "to 'continue' VPP from within gdb?", RED))
257 t, v, tb = sys.exc_info()
267 Disconnect vpp-api, kill vpp and cleanup shared memory files
269 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
271 if cls.vpp.returncode is None:
272 print(double_line_delim)
273 print("VPP or GDB server is still running")
274 print(single_line_delim)
275 raw_input("When done debugging, press ENTER to kill the "
276 "process and finish running the testcase...")
278 os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
279 cls.pump_thread_stop_flag.set()
280 if hasattr(cls, 'pump_thread'):
281 cls.logger.debug("Waiting for pump thread to stop")
282 cls.pump_thread.join()
283 if hasattr(cls, 'vpp_stderr_reader_thread'):
284 cls.logger.debug("Waiting for stdderr pump to stop")
285 cls.vpp_stderr_reader_thread.join()
287 if hasattr(cls, 'vpp'):
288 if hasattr(cls, 'vapi'):
289 cls.vapi.disconnect()
292 if cls.vpp.returncode is None:
293 cls.logger.debug("Sending TERM to vpp")
295 cls.logger.debug("Waiting for vpp to die")
296 cls.vpp.communicate()
299 if hasattr(cls, 'vpp_stdout_deque'):
300 cls.logger.info(single_line_delim)
301 cls.logger.info('VPP output to stdout while running %s:',
303 cls.logger.info(single_line_delim)
304 f = open(cls.tempdir + '/vpp_stdout.txt', 'w')
305 vpp_output = "".join(cls.vpp_stdout_deque)
307 cls.logger.info('\n%s', vpp_output)
308 cls.logger.info(single_line_delim)
310 if hasattr(cls, 'vpp_stderr_deque'):
311 cls.logger.info(single_line_delim)
312 cls.logger.info('VPP output to stderr while running %s:',
314 cls.logger.info(single_line_delim)
315 f = open(cls.tempdir + '/vpp_stderr.txt', 'w')
316 vpp_output = "".join(cls.vpp_stderr_deque)
318 cls.logger.info('\n%s', vpp_output)
319 cls.logger.info(single_line_delim)
322 def tearDownClass(cls):
323 """ Perform final cleanup after running all tests in this test-case """
327 """ Show various debug prints after each test """
328 self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
329 (self.__class__.__name__, self._testMethodName,
330 self._testMethodDoc))
331 if not self.vpp_dead:
332 self.logger.debug(self.vapi.cli("show trace"))
333 self.logger.info(self.vapi.ppcli("show int"))
334 self.logger.info(self.vapi.ppcli("show hardware"))
335 self.logger.info(self.vapi.ppcli("show error"))
336 self.logger.info(self.vapi.ppcli("show run"))
337 self.registry.remove_vpp_config(self.logger)
340 """ Clear trace before running each test"""
341 self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
342 (self.__class__.__name__, self._testMethodName,
343 self._testMethodDoc))
345 raise Exception("VPP is dead when setting up the test")
346 self.sleep(.1, "during setUp")
347 self.vpp_stdout_deque.append(
348 "--- test setUp() for %s.%s(%s) starts here ---\n" %
349 (self.__class__.__name__, self._testMethodName,
350 self._testMethodDoc))
351 self.vpp_stderr_deque.append(
352 "--- test setUp() for %s.%s(%s) starts here ---\n" %
353 (self.__class__.__name__, self._testMethodName,
354 self._testMethodDoc))
355 self.vapi.cli("clear trace")
356 # store the test instance inside the test class - so that objects
357 # holding the class can access instance methods (like assertEqual)
358 type(self).test_instance = self
361 def pg_enable_capture(cls, interfaces):
363 Enable capture on packet-generator interfaces
365 :param interfaces: iterable interface indexes
372 def register_capture(cls, cap_name):
373 """ Register a capture in the testclass """
374 # add to the list of captures with current timestamp
375 cls._captures.append((time.time(), cap_name))
376 # filter out from zombies
377 cls._zombie_captures = [(stamp, name)
378 for (stamp, name) in cls._zombie_captures
383 """ Remove any zombie captures and enable the packet generator """
384 # how long before capture is allowed to be deleted - otherwise vpp
385 # crashes - 100ms seems enough (this shouldn't be needed at all)
388 for stamp, cap_name in cls._zombie_captures:
389 wait = stamp + capture_ttl - now
391 cls.sleep(wait, "before deleting capture %s" % cap_name)
393 cls.logger.debug("Removing zombie capture %s" % cap_name)
394 cls.vapi.cli('packet-generator delete %s' % cap_name)
396 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
397 cls.vapi.cli('packet-generator enable')
398 cls._zombie_captures = cls._captures
402 def create_pg_interfaces(cls, interfaces):
404 Create packet-generator interfaces.
406 :param interfaces: iterable indexes of the interfaces.
407 :returns: List of created interfaces.
412 intf = VppPGInterface(cls, i)
413 setattr(cls, intf.name, intf)
415 cls.pg_interfaces = result
419 def create_loopback_interfaces(cls, interfaces):
421 Create loopback interfaces.
423 :param interfaces: iterable indexes of the interfaces.
424 :returns: List of created interfaces.
428 intf = VppLoInterface(cls, i)
429 setattr(cls, intf.name, intf)
431 cls.lo_interfaces = result
435 def extend_packet(packet, size):
437 Extend packet to given size by padding with spaces
438 NOTE: Currently works only when Raw layer is present.
440 :param packet: packet
441 :param size: target size
444 packet_len = len(packet) + 4
445 extend = size - packet_len
447 packet[Raw].load += ' ' * extend
450 def reset_packet_infos(cls):
451 """ Reset the list of packet info objects and packet counts to zero """
452 cls._packet_infos = {}
453 cls._packet_count_for_dst_if_idx = {}
456 def create_packet_info(cls, src_if, dst_if):
458 Create packet info object containing the source and destination indexes
459 and add it to the testcase's packet info list
461 :param VppInterface src_if: source interface
462 :param VppInterface dst_if: destination interface
464 :returns: _PacketInfo object
468 info.index = len(cls._packet_infos)
469 info.src = src_if.sw_if_index
470 info.dst = dst_if.sw_if_index
471 if isinstance(dst_if, VppSubInterface):
472 dst_idx = dst_if.parent.sw_if_index
474 dst_idx = dst_if.sw_if_index
475 if dst_idx in cls._packet_count_for_dst_if_idx:
476 cls._packet_count_for_dst_if_idx[dst_idx] += 1
478 cls._packet_count_for_dst_if_idx[dst_idx] = 1
479 cls._packet_infos[info.index] = info
483 def info_to_payload(info):
485 Convert _PacketInfo object to packet payload
487 :param info: _PacketInfo object
489 :returns: string containing serialized data from packet info
491 return "%d %d %d" % (info.index, info.src, info.dst)
494 def payload_to_info(payload):
496 Convert packet payload to _PacketInfo object
498 :param payload: packet payload
500 :returns: _PacketInfo object containing de-serialized data from payload
503 numbers = payload.split()
505 info.index = int(numbers[0])
506 info.src = int(numbers[1])
507 info.dst = int(numbers[2])
510 def get_next_packet_info(self, info):
512 Iterate over the packet info list stored in the testcase
513 Start iteration with first element if info is None
514 Continue based on index in info if info is specified
516 :param info: info or None
517 :returns: next info in list or None if no more infos
522 next_index = info.index + 1
523 if next_index == len(self._packet_infos):
526 return self._packet_infos[next_index]
528 def get_next_packet_info_for_interface(self, src_index, info):
530 Search the packet info list for the next packet info with same source
533 :param src_index: source interface index to search for
534 :param info: packet info - where to start the search
535 :returns: packet info or None
539 info = self.get_next_packet_info(info)
542 if info.src == src_index:
545 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
547 Search the packet info list for the next packet info with same source
548 and destination interface indexes
550 :param src_index: source interface index to search for
551 :param dst_index: destination interface index to search for
552 :param info: packet info - where to start the search
553 :returns: packet info or None
557 info = self.get_next_packet_info_for_interface(src_index, info)
560 if info.dst == dst_index:
563 def assert_equal(self, real_value, expected_value, name_or_class=None):
564 if name_or_class is None:
565 self.assertEqual(real_value, expected_value, msg)
568 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
569 msg = msg % (getdoc(name_or_class).strip(),
570 real_value, str(name_or_class(real_value)),
571 expected_value, str(name_or_class(expected_value)))
573 msg = "Invalid %s: %s does not match expected value %s" % (
574 name_or_class, real_value, expected_value)
576 self.assertEqual(real_value, expected_value, msg)
578 def assert_in_range(self,
586 msg = "Invalid %s: %s out of range <%s,%s>" % (
587 name, real_value, expected_min, expected_max)
588 self.assertTrue(expected_min <= real_value <= expected_max, msg)
591 def sleep(cls, timeout, remark=None):
592 if hasattr(cls, 'logger'):
593 cls.logger.debug("Sleeping for %ss (%s)" % (timeout, remark))
597 class VppTestResult(unittest.TestResult):
599 @property result_string
600 String variable to store the test case result string.
602 List variable containing 2-tuples of TestCase instances and strings
603 holding formatted tracebacks. Each tuple represents a test which
604 raised an unexpected exception.
606 List variable containing 2-tuples of TestCase instances and strings
607 holding formatted tracebacks. Each tuple represents a test where
608 a failure was explicitly signalled using the TestCase.assert*()
612 def __init__(self, stream, descriptions, verbosity):
614 :param stream File descriptor to store where to report test results.
615 Set to the standard error stream by default.
616 :param descriptions Boolean variable to store information if to use
617 test case descriptions.
618 :param verbosity Integer variable to store required verbosity level.
620 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
622 self.descriptions = descriptions
623 self.verbosity = verbosity
624 self.result_string = None
626 def addSuccess(self, test):
628 Record a test succeeded result
633 if hasattr(test, 'logger'):
634 test.logger.debug("--- addSuccess() %s.%s(%s) called"
635 % (test.__class__.__name__,
636 test._testMethodName,
637 test._testMethodDoc))
638 unittest.TestResult.addSuccess(self, test)
639 self.result_string = colorize("OK", GREEN)
641 def addSkip(self, test, reason):
643 Record a test skipped.
649 if hasattr(test, 'logger'):
650 test.logger.debug("--- addSkip() %s.%s(%s) called, reason is %s"
651 % (test.__class__.__name__,
652 test._testMethodName,
655 unittest.TestResult.addSkip(self, test, reason)
656 self.result_string = colorize("SKIP", YELLOW)
658 def addFailure(self, test, err):
660 Record a test failed result
663 :param err: error message
666 if hasattr(test, 'logger'):
667 test.logger.debug("--- addFailure() %s.%s(%s) called, err is %s"
668 % (test.__class__.__name__,
669 test._testMethodName,
670 test._testMethodDoc, err))
671 test.logger.debug("formatted exception is:\n%s" %
672 "".join(format_exception(*err)))
673 unittest.TestResult.addFailure(self, test, err)
674 if hasattr(test, 'tempdir'):
675 self.result_string = colorize("FAIL", RED) + \
676 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
678 self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
680 def addError(self, test, err):
682 Record a test error result
685 :param err: error message
688 if hasattr(test, 'logger'):
689 test.logger.debug("--- addError() %s.%s(%s) called, err is %s"
690 % (test.__class__.__name__,
691 test._testMethodName,
692 test._testMethodDoc, err))
693 test.logger.debug("formatted exception is:\n%s" %
694 "".join(format_exception(*err)))
695 unittest.TestResult.addError(self, test, err)
696 if hasattr(test, 'tempdir'):
697 self.result_string = colorize("ERROR", RED) + \
698 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
700 self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
702 def getDescription(self, test):
707 :returns: test description
710 # TODO: if none print warning not raise exception
711 short_description = test.shortDescription()
712 if self.descriptions and short_description:
713 return short_description
717 def startTest(self, test):
724 unittest.TestResult.startTest(self, test)
725 if self.verbosity > 0:
727 "Starting " + self.getDescription(test) + " ...")
728 self.stream.writeln(single_line_delim)
730 def stopTest(self, test):
737 unittest.TestResult.stopTest(self, test)
738 if self.verbosity > 0:
739 self.stream.writeln(single_line_delim)
740 self.stream.writeln("%-73s%s" % (self.getDescription(test),
742 self.stream.writeln(single_line_delim)
744 self.stream.writeln("%-73s%s" % (self.getDescription(test),
747 def printErrors(self):
749 Print errors from running the test case
751 self.stream.writeln()
752 self.printErrorList('ERROR', self.errors)
753 self.printErrorList('FAIL', self.failures)
755 def printErrorList(self, flavour, errors):
757 Print error list to the output stream together with error type
758 and test case description.
760 :param flavour: error type
761 :param errors: iterable errors
764 for test, err in errors:
765 self.stream.writeln(double_line_delim)
766 self.stream.writeln("%s: %s" %
767 (flavour, self.getDescription(test)))
768 self.stream.writeln(single_line_delim)
769 self.stream.writeln("%s" % err)
772 class VppTestRunner(unittest.TextTestRunner):
774 A basic test runner implementation which prints results to standard error.
777 def resultclass(self):
778 """Class maintaining the results of the tests"""
781 def __init__(self, stream=sys.stderr, descriptions=True, verbosity=1,
782 failfast=False, buffer=False, resultclass=None):
783 # ignore stream setting here, use hard-coded stdout to be in sync
784 # with prints from VppTestCase methods ...
785 super(VppTestRunner, self).__init__(sys.stdout, descriptions,
786 verbosity, failfast, buffer,
791 def parse_test_option(self):
793 f = os.getenv(self.test_option)
796 filter_file_name = None
797 filter_class_name = None
798 filter_func_name = None
803 raise Exception("Unrecognized %s option: %s" %
804 (self.test_option, f))
806 if parts[2] not in ('*', ''):
807 filter_func_name = parts[2]
808 if parts[1] not in ('*', ''):
809 filter_class_name = parts[1]
810 if parts[0] not in ('*', ''):
811 if parts[0].startswith('test_'):
812 filter_file_name = parts[0]
814 filter_file_name = 'test_%s' % parts[0]
816 if f.startswith('test_'):
819 filter_file_name = 'test_%s' % f
820 return filter_file_name, filter_class_name, filter_func_name
822 def filter_tests(self, tests, filter_file, filter_class, filter_func):
823 result = unittest.suite.TestSuite()
825 if isinstance(t, unittest.suite.TestSuite):
826 # this is a bunch of tests, recursively filter...
827 x = self.filter_tests(t, filter_file, filter_class,
829 if x.countTestCases() > 0:
831 elif isinstance(t, unittest.TestCase):
832 # this is a single test
833 parts = t.id().split('.')
834 # t.id() for common cases like this:
835 # test_classifier.TestClassifier.test_acl_ip
836 # apply filtering only if it is so
838 if filter_file and filter_file != parts[0]:
840 if filter_class and filter_class != parts[1]:
842 if filter_func and filter_func != parts[2]:
846 # unexpected object, don't touch it
857 gc.disable() # disable garbage collection, we'll do that manually
858 print("Running tests using custom test runner") # debug message
859 filter_file, filter_class, filter_func = self.parse_test_option()
860 print("Active filters: file=%s, class=%s, function=%s" % (
861 filter_file, filter_class, filter_func))
862 filtered = self.filter_tests(test, filter_file, filter_class,
864 print("%s out of %s tests match specified filters" % (
865 filtered.countTestCases(), test.countTestCases()))
866 return super(VppTestRunner, self).run(filtered)