8 from collections import deque
9 from threading import Thread
10 from inspect import getdoc
11 from hook import StepHook, PollHook
12 from vpp_pg_interface import VppPGInterface
13 from vpp_sub_interface import VppSubInterface
14 from vpp_lo_interface import VppLoInterface
15 from vpp_papi_provider import VppPapiProvider
16 from scapy.packet import Raw
17 from logging import FileHandler, DEBUG
21 Test framework module.
23 The module provides a set of tools for constructing and running tests and
24 representing the results.
28 class _PacketInfo(object):
29 """Private class to create packet info object.
31 Help process information about the next packet.
32 Set variables to default values.
34 #: Store the index of the packet.
36 #: Store the index of the source packet generator interface of the packet.
38 #: Store the index of the destination packet generator interface
41 #: Store the copy of the former packet.
44 def __eq__(self, other):
45 index = self.index == other.index
46 src = self.src == other.src
47 dst = self.dst == other.dst
48 data = self.data == other.data
49 return index and src and dst and data
52 def pump_output(out, deque):
53 for line in iter(out.readline, b''):
57 class VppTestCase(unittest.TestCase):
58 """This subclass is a base class for VPP test cases that are implemented as
59 classes. It provides methods to create and run test case.
63 def packet_infos(self):
64 """List of packet infos"""
65 return self._packet_infos
68 def get_packet_count_for_if_idx(cls, dst_if_index):
69 """Get the number of packet info for specified destination if index"""
70 if dst_if_index in cls._packet_count_for_dst_if_idx:
71 return cls._packet_count_for_dst_if_idx[dst_if_index]
77 """Return the instance of this testcase"""
78 return cls.test_instance
81 def set_debug_flags(cls, d):
82 cls.debug_core = False
84 cls.debug_gdbserver = False
89 if resource.getrlimit(resource.RLIMIT_CORE)[0] <= 0:
90 # give a heads up if this is actually useless
91 cls.logger.critical("WARNING: core size limit is set 0, core "
92 "files will NOT be created")
96 elif dl == "gdbserver":
97 cls.debug_gdbserver = True
99 raise Exception("Unrecognized DEBUG option: '%s'" % d)
102 def setUpConstants(cls):
103 """ Set-up the test case class based on environment variables """
105 s = os.getenv("STEP")
106 cls.step = True if s.lower() in ("y", "yes", "1") else False
110 d = os.getenv("DEBUG")
113 cls.set_debug_flags(d)
114 cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
115 cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
117 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
118 debug_cli = "cli-listen localhost:5002"
119 cls.vpp_cmdline = [cls.vpp_bin, "unix", "{", "nodaemon", debug_cli, "}",
120 "api-segment", "{", "prefix", cls.shm_prefix, "}"]
121 if cls.plugin_path is not None:
122 cls.vpp_cmdline.extend(["plugin_path", cls.plugin_path])
123 cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
126 def wait_for_enter(cls):
127 if cls.debug_gdbserver:
128 print(double_line_delim)
129 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
131 print(double_line_delim)
132 print("Spawned VPP with PID: %d" % cls.vpp.pid)
134 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
136 print(single_line_delim)
137 print("You can debug the VPP using e.g.:")
138 if cls.debug_gdbserver:
139 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
140 print("Now is the time to attach a gdb by running the above "
141 "command, set up breakpoints etc. and then resume VPP from "
142 "within gdb by issuing the 'continue' command")
144 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
145 print("Now is the time to attach a gdb by running the above "
146 "command and set up breakpoints etc.")
147 print(single_line_delim)
148 raw_input("Press ENTER to continue running the testcase...")
152 cmdline = cls.vpp_cmdline
154 if cls.debug_gdbserver:
155 gdbserver = '/usr/bin/gdbserver'
156 if not os.path.isfile(gdbserver) or \
157 not os.access(gdbserver, os.X_OK):
158 raise Exception("gdbserver binary '%s' does not exist or is "
159 "not executable" % gdbserver)
161 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
162 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
165 cls.vpp = subprocess.Popen(cmdline,
166 stdout=subprocess.PIPE,
167 stderr=subprocess.PIPE,
169 except Exception as e:
170 cls.logger.critical("Couldn't start vpp: %s" % e)
178 Perform class setup before running the testcase
179 Remove shared memory files, start vpp and connect the vpp-api
181 cls.logger = getLogger(cls.__name__)
182 cls.tempdir = tempfile.mkdtemp(
183 prefix='vpp-unittest-' + cls.__name__ + '-')
184 file_handler = FileHandler("%s/log.txt" % cls.tempdir)
185 file_handler.setLevel(DEBUG)
186 cls.logger.addHandler(file_handler)
187 cls.shm_prefix = cls.tempdir.split("/")[-1]
188 os.chdir(cls.tempdir)
189 cls.logger.info("Temporary dir is %s, shm prefix is %s",
190 cls.tempdir, cls.shm_prefix)
192 cls.reset_packet_infos()
194 cls._zombie_captures = []
197 print(double_line_delim)
198 print(colorize(getdoc(cls).splitlines()[0], YELLOW))
199 print(double_line_delim)
200 # need to catch exceptions here because if we raise, then the cleanup
201 # doesn't get called and we might end with a zombie vpp
204 cls.vpp_stdout_deque = deque()
205 cls.vpp_stdout_reader_thread = Thread(target=pump_output, args=(
206 cls.vpp.stdout, cls.vpp_stdout_deque))
207 cls.vpp_stdout_reader_thread.start()
208 cls.vpp_stderr_deque = deque()
209 cls.vpp_stderr_reader_thread = Thread(target=pump_output, args=(
210 cls.vpp.stderr, cls.vpp_stderr_deque))
211 cls.vpp_stderr_reader_thread.start()
212 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls)
217 cls.vapi.register_hook(hook)
223 if cls.debug_gdbserver:
224 print(colorize("You're running VPP inside gdbserver but "
225 "VPP-API connection failed, did you forget "
226 "to 'continue' VPP from within gdb?", RED))
229 t, v, tb = sys.exc_info()
239 Disconnect vpp-api, kill vpp and cleanup shared memory files
241 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
243 if cls.vpp.returncode is None:
244 print(double_line_delim)
245 print("VPP or GDB server is still running")
246 print(single_line_delim)
247 raw_input("When done debugging, press ENTER to kill the process"
248 " and finish running the testcase...")
250 if hasattr(cls, 'vpp'):
251 if hasattr(cls, 'vapi'):
252 cls.vapi.disconnect()
254 if cls.vpp.returncode is None:
258 if hasattr(cls, 'vpp_stdout_deque'):
259 cls.logger.info(single_line_delim)
260 cls.logger.info('VPP output to stdout while running %s:',
262 cls.logger.info(single_line_delim)
263 f = open(cls.tempdir + '/vpp_stdout.txt', 'w')
264 vpp_output = "".join(cls.vpp_stdout_deque)
266 cls.logger.info('\n%s', vpp_output)
267 cls.logger.info(single_line_delim)
269 if hasattr(cls, 'vpp_stderr_deque'):
270 cls.logger.info(single_line_delim)
271 cls.logger.info('VPP output to stderr while running %s:',
273 cls.logger.info(single_line_delim)
274 f = open(cls.tempdir + '/vpp_stderr.txt', 'w')
275 vpp_output = "".join(cls.vpp_stderr_deque)
277 cls.logger.info('\n%s', vpp_output)
278 cls.logger.info(single_line_delim)
281 def tearDownClass(cls):
282 """ Perform final cleanup after running all tests in this test-case """
286 """ Show various debug prints after each test """
287 if not self.vpp_dead:
288 self.logger.debug(self.vapi.cli("show trace"))
289 self.logger.info(self.vapi.ppcli("show int"))
290 self.logger.info(self.vapi.ppcli("show hardware"))
291 self.logger.info(self.vapi.ppcli("show error"))
292 self.logger.info(self.vapi.ppcli("show run"))
295 """ Clear trace before running each test"""
297 raise Exception("VPP is dead when setting up the test")
299 self.vpp_stdout_deque.append(
300 "--- test setUp() for %s.%s(%s) starts here ---\n" %
301 (self.__class__.__name__, self._testMethodName,
302 self._testMethodDoc))
303 self.vpp_stderr_deque.append(
304 "--- test setUp() for %s.%s(%s) starts here ---\n" %
305 (self.__class__.__name__, self._testMethodName,
306 self._testMethodDoc))
307 self.vapi.cli("clear trace")
308 # store the test instance inside the test class - so that objects
309 # holding the class can access instance methods (like assertEqual)
310 type(self).test_instance = self
313 def pg_enable_capture(cls, interfaces):
315 Enable capture on packet-generator interfaces
317 :param interfaces: iterable interface indexes
324 def register_capture(cls, cap_name):
325 """ Register a capture in the testclass """
326 # add to the list of captures with current timestamp
327 cls._captures.append((time.time(), cap_name))
328 # filter out from zombies
329 cls._zombie_captures = [(stamp, name)
330 for (stamp, name) in cls._zombie_captures
335 """ Remove any zombie captures and enable the packet generator """
336 # how long before capture is allowed to be deleted - otherwise vpp
337 # crashes - 100ms seems enough (this shouldn't be needed at all)
340 for stamp, cap_name in cls._zombie_captures:
341 wait = stamp + capture_ttl - now
343 cls.logger.debug("Waiting for %ss before deleting capture %s",
347 cls.logger.debug("Removing zombie capture %s" % cap_name)
348 cls.vapi.cli('packet-generator delete %s' % cap_name)
350 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
351 cls.vapi.cli('packet-generator enable')
352 cls._zombie_captures = cls._captures
356 def create_pg_interfaces(cls, interfaces):
358 Create packet-generator interfaces.
360 :param interfaces: iterable indexes of the interfaces.
361 :returns: List of created interfaces.
366 intf = VppPGInterface(cls, i)
367 setattr(cls, intf.name, intf)
369 cls.pg_interfaces = result
373 def create_loopback_interfaces(cls, interfaces):
375 Create loopback interfaces.
377 :param interfaces: iterable indexes of the interfaces.
378 :returns: List of created interfaces.
382 intf = VppLoInterface(cls, i)
383 setattr(cls, intf.name, intf)
385 cls.lo_interfaces = result
389 def extend_packet(packet, size):
391 Extend packet to given size by padding with spaces
392 NOTE: Currently works only when Raw layer is present.
394 :param packet: packet
395 :param size: target size
398 packet_len = len(packet) + 4
399 extend = size - packet_len
401 packet[Raw].load += ' ' * extend
404 def reset_packet_infos(cls):
405 """ Reset the list of packet info objects and packet counts to zero """
406 cls._packet_infos = {}
407 cls._packet_count_for_dst_if_idx = {}
410 def create_packet_info(cls, src_if, dst_if):
412 Create packet info object containing the source and destination indexes
413 and add it to the testcase's packet info list
415 :param VppInterface src_if: source interface
416 :param VppInterface dst_if: destination interface
418 :returns: _PacketInfo object
422 info.index = len(cls._packet_infos)
423 info.src = src_if.sw_if_index
424 info.dst = dst_if.sw_if_index
425 if isinstance(dst_if, VppSubInterface):
426 dst_idx = dst_if.parent.sw_if_index
428 dst_idx = dst_if.sw_if_index
429 if dst_idx in cls._packet_count_for_dst_if_idx:
430 cls._packet_count_for_dst_if_idx[dst_idx] += 1
432 cls._packet_count_for_dst_if_idx[dst_idx] = 1
433 cls._packet_infos[info.index] = info
437 def info_to_payload(info):
439 Convert _PacketInfo object to packet payload
441 :param info: _PacketInfo object
443 :returns: string containing serialized data from packet info
445 return "%d %d %d" % (info.index, info.src, info.dst)
448 def payload_to_info(payload):
450 Convert packet payload to _PacketInfo object
452 :param payload: packet payload
454 :returns: _PacketInfo object containing de-serialized data from payload
457 numbers = payload.split()
459 info.index = int(numbers[0])
460 info.src = int(numbers[1])
461 info.dst = int(numbers[2])
464 def get_next_packet_info(self, info):
466 Iterate over the packet info list stored in the testcase
467 Start iteration with first element if info is None
468 Continue based on index in info if info is specified
470 :param info: info or None
471 :returns: next info in list or None if no more infos
476 next_index = info.index + 1
477 if next_index == len(self._packet_infos):
480 return self._packet_infos[next_index]
482 def get_next_packet_info_for_interface(self, src_index, info):
484 Search the packet info list for the next packet info with same source
487 :param src_index: source interface index to search for
488 :param info: packet info - where to start the search
489 :returns: packet info or None
493 info = self.get_next_packet_info(info)
496 if info.src == src_index:
499 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
501 Search the packet info list for the next packet info with same source
502 and destination interface indexes
504 :param src_index: source interface index to search for
505 :param dst_index: destination interface index to search for
506 :param info: packet info - where to start the search
507 :returns: packet info or None
511 info = self.get_next_packet_info_for_interface(src_index, info)
514 if info.dst == dst_index:
517 def assert_equal(self, real_value, expected_value, name_or_class=None):
518 if name_or_class is None:
519 self.assertEqual(real_value, expected_value, msg)
522 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
523 msg = msg % (getdoc(name_or_class).strip(),
524 real_value, str(name_or_class(real_value)),
525 expected_value, str(name_or_class(expected_value)))
527 msg = "Invalid %s: %s does not match expected value %s" % (
528 name_or_class, real_value, expected_value)
530 self.assertEqual(real_value, expected_value, msg)
541 msg = "Invalid %s: %s out of range <%s,%s>" % (
542 name, real_value, expected_min, expected_max)
543 self.assertTrue(expected_min <= real_value <= expected_max, msg)
546 class VppTestResult(unittest.TestResult):
548 @property result_string
549 String variable to store the test case result string.
551 List variable containing 2-tuples of TestCase instances and strings
552 holding formatted tracebacks. Each tuple represents a test which
553 raised an unexpected exception.
555 List variable containing 2-tuples of TestCase instances and strings
556 holding formatted tracebacks. Each tuple represents a test where
557 a failure was explicitly signalled using the TestCase.assert*()
561 def __init__(self, stream, descriptions, verbosity):
563 :param stream File descriptor to store where to report test results. Set
564 to the standard error stream by default.
565 :param descriptions Boolean variable to store information if to use test
567 :param verbosity Integer variable to store required verbosity level.
569 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
571 self.descriptions = descriptions
572 self.verbosity = verbosity
573 self.result_string = None
575 def addSuccess(self, test):
577 Record a test succeeded result
582 unittest.TestResult.addSuccess(self, test)
583 self.result_string = colorize("OK", GREEN)
585 def addSkip(self, test, reason):
587 Record a test skipped.
593 unittest.TestResult.addSkip(self, test, reason)
594 self.result_string = colorize("SKIP", YELLOW)
596 def addFailure(self, test, err):
598 Record a test failed result
601 :param err: error message
604 unittest.TestResult.addFailure(self, test, err)
605 if hasattr(test, 'tempdir'):
606 self.result_string = colorize("FAIL", RED) + \
607 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
609 self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
611 def addError(self, test, err):
613 Record a test error result
616 :param err: error message
619 unittest.TestResult.addError(self, test, err)
620 if hasattr(test, 'tempdir'):
621 self.result_string = colorize("ERROR", RED) + \
622 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
624 self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
626 def getDescription(self, test):
631 :returns: test description
634 # TODO: if none print warning not raise exception
635 short_description = test.shortDescription()
636 if self.descriptions and short_description:
637 return short_description
641 def startTest(self, test):
648 unittest.TestResult.startTest(self, test)
649 if self.verbosity > 0:
651 "Starting " + self.getDescription(test) + " ...")
652 self.stream.writeln(single_line_delim)
654 def stopTest(self, test):
661 unittest.TestResult.stopTest(self, test)
662 if self.verbosity > 0:
663 self.stream.writeln(single_line_delim)
664 self.stream.writeln("%-60s%s" %
665 (self.getDescription(test), self.result_string))
666 self.stream.writeln(single_line_delim)
668 self.stream.writeln("%-60s%s" %
669 (self.getDescription(test), self.result_string))
671 def printErrors(self):
673 Print errors from running the test case
675 self.stream.writeln()
676 self.printErrorList('ERROR', self.errors)
677 self.printErrorList('FAIL', self.failures)
679 def printErrorList(self, flavour, errors):
681 Print error list to the output stream together with error type
682 and test case description.
684 :param flavour: error type
685 :param errors: iterable errors
688 for test, err in errors:
689 self.stream.writeln(double_line_delim)
690 self.stream.writeln("%s: %s" %
691 (flavour, self.getDescription(test)))
692 self.stream.writeln(single_line_delim)
693 self.stream.writeln("%s" % err)
696 class VppTestRunner(unittest.TextTestRunner):
698 A basic test runner implementation which prints results on standard error.
701 def resultclass(self):
702 """Class maintaining the results of the tests"""
712 print("Running tests using custom test runner") # debug message
713 return super(VppTestRunner, self).run(test)