9 from collections import deque
10 from threading import Thread
11 from inspect import getdoc
12 from hook import StepHook, PollHook
13 from vpp_pg_interface import VppPGInterface
14 from vpp_lo_interface import VppLoInterface
15 from vpp_papi_provider import VppPapiProvider
16 from scapy.packet import Raw
20 Test framework module.
22 The module provides a set of tools for constructing and running tests and
23 representing the results.
27 class _PacketInfo(object):
28 """Private class to create packet info object.
30 Help process information about the next packet.
31 Set variables to default values.
33 #: Store the index of the packet.
35 #: Store the index of the source packet generator interface of the packet.
37 #: Store the index of the destination packet generator interface
40 #: Store the copy of the former packet.
43 def __eq__(self, other):
44 index = self.index == other.index
45 src = self.src == other.src
46 dst = self.dst == other.dst
47 data = self.data == other.data
48 return index and src and dst and data
51 def pump_output(out, deque):
52 for line in iter(out.readline, b''):
56 class VppTestCase(unittest.TestCase):
57 """This subclass is a base class for VPP test cases that are implemented as
58 classes. It provides methods to create and run test case.
62 def packet_infos(self):
63 """List of packet infos"""
64 return self._packet_infos
67 def packet_infos(self, value):
68 self._packet_infos = value
72 """Return the instance of this testcase"""
73 return cls.test_instance
76 def set_debug_flags(cls, d):
77 cls.debug_core = False
79 cls.debug_gdbserver = False
84 if resource.getrlimit(resource.RLIMIT_CORE)[0] <= 0:
85 # give a heads up if this is actually useless
86 cls.logger.critical("WARNING: core size limit is set 0, core "
87 "files will NOT be created")
91 elif dl == "gdbserver":
92 cls.debug_gdbserver = True
94 raise Exception("Unrecognized DEBUG option: '%s'" % d)
97 def setUpConstants(cls):
98 """ Set-up the test case class based on environment variables """
100 s = os.getenv("STEP")
101 cls.step = True if s.lower() in ("y", "yes", "1") else False
105 d = os.getenv("DEBUG")
108 cls.set_debug_flags(d)
109 cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
110 cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
112 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
113 debug_cli = "cli-listen localhost:5002"
114 cls.vpp_cmdline = [cls.vpp_bin, "unix", "{", "nodaemon", debug_cli, "}",
115 "api-segment", "{", "prefix", cls.shm_prefix, "}"]
116 if cls.plugin_path is not None:
117 cls.vpp_cmdline.extend(["plugin_path", cls.plugin_path])
118 cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
121 def wait_for_enter(cls):
122 if cls.debug_gdbserver:
123 print(double_line_delim)
124 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
126 print(double_line_delim)
127 print("Spawned VPP with PID: %d" % cls.vpp.pid)
129 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
131 print(single_line_delim)
132 print("You can debug the VPP using e.g.:")
133 if cls.debug_gdbserver:
134 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
135 print("Now is the time to attach a gdb by running the above "
136 "command, set up breakpoints etc. and then resume VPP from "
137 "within gdb by issuing the 'continue' command")
139 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
140 print("Now is the time to attach a gdb by running the above "
141 "command and set up breakpoints etc.")
142 print(single_line_delim)
143 raw_input("Press ENTER to continue running the testcase...")
147 cmdline = cls.vpp_cmdline
149 if cls.debug_gdbserver:
150 gdbserver = '/usr/bin/gdbserver'
151 if not os.path.isfile(gdbserver) or \
152 not os.access(gdbserver, os.X_OK):
153 raise Exception("gdbserver binary '%s' does not exist or is "
154 "not executable" % gdbserver)
156 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
157 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
160 cls.vpp = subprocess.Popen(cmdline,
161 stdout=subprocess.PIPE,
162 stderr=subprocess.PIPE,
164 except Exception as e:
165 cls.logger.critical("Couldn't start vpp: %s" % e)
173 Perform class setup before running the testcase
174 Remove shared memory files, start vpp and connect the vpp-api
176 cls.logger = getLogger(cls.__name__)
177 cls.tempdir = tempfile.mkdtemp(
178 prefix='vpp-unittest-' + cls.__name__ + '-')
179 cls.shm_prefix = cls.tempdir.split("/")[-1]
180 os.chdir(cls.tempdir)
181 cls.logger.info("Temporary dir is %s, shm prefix is %s",
182 cls.tempdir, cls.shm_prefix)
185 cls.packet_infos = {}
188 print(double_line_delim)
189 print(colorize(getdoc(cls).splitlines()[0], YELLOW))
190 print(double_line_delim)
191 # need to catch exceptions here because if we raise, then the cleanup
192 # doesn't get called and we might end with a zombie vpp
195 cls.vpp_stdout_deque = deque()
196 cls.vpp_stdout_reader_thread = Thread(target=pump_output, args=(
197 cls.vpp.stdout, cls.vpp_stdout_deque))
198 cls.vpp_stdout_reader_thread.start()
199 cls.vpp_stderr_deque = deque()
200 cls.vpp_stderr_reader_thread = Thread(target=pump_output, args=(
201 cls.vpp.stderr, cls.vpp_stderr_deque))
202 cls.vpp_stderr_reader_thread.start()
203 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls)
208 cls.vapi.register_hook(hook)
214 if cls.debug_gdbserver:
215 print(colorize("You're running VPP inside gdbserver but "
216 "VPP-API connection failed, did you forget "
217 "to 'continue' VPP from within gdb?", RED))
220 t, v, tb = sys.exc_info()
230 Disconnect vpp-api, kill vpp and cleanup shared memory files
232 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
234 if cls.vpp.returncode is None:
235 print(double_line_delim)
236 print("VPP or GDB server is still running")
237 print(single_line_delim)
238 raw_input("When done debugging, press ENTER to kill the process"
239 " and finish running the testcase...")
241 if hasattr(cls, 'vpp'):
242 if hasattr(cls, 'vapi'):
243 cls.vapi.disconnect()
245 if cls.vpp.returncode is None:
249 if hasattr(cls, 'vpp_stdout_deque'):
250 cls.logger.info(single_line_delim)
251 cls.logger.info('VPP output to stdout while running %s:',
253 cls.logger.info(single_line_delim)
254 f = open(cls.tempdir + '/vpp_stdout.txt', 'w')
255 vpp_output = "".join(cls.vpp_stdout_deque)
257 cls.logger.info('\n%s', vpp_output)
258 cls.logger.info(single_line_delim)
260 if hasattr(cls, 'vpp_stderr_deque'):
261 cls.logger.info(single_line_delim)
262 cls.logger.info('VPP output to stderr while running %s:',
264 cls.logger.info(single_line_delim)
265 f = open(cls.tempdir + '/vpp_stderr.txt', 'w')
266 vpp_output = "".join(cls.vpp_stderr_deque)
268 cls.logger.info('\n%s', vpp_output)
269 cls.logger.info(single_line_delim)
272 def tearDownClass(cls):
273 """ Perform final cleanup after running all tests in this test-case """
277 """ Show various debug prints after each test """
278 if not self.vpp_dead:
279 self.logger.debug(self.vapi.cli("show trace"))
280 self.logger.info(self.vapi.ppcli("show int"))
281 self.logger.info(self.vapi.ppcli("show hardware"))
282 self.logger.info(self.vapi.ppcli("show error"))
283 self.logger.info(self.vapi.ppcli("show run"))
286 """ Clear trace before running each test"""
288 raise Exception("VPP is dead when setting up the test")
290 self.vpp_stdout_deque.append(
291 "--- test setUp() for %s.%s(%s) starts here ---\n" %
292 (self.__class__.__name__, self._testMethodName,
293 self._testMethodDoc))
294 self.vpp_stderr_deque.append(
295 "--- test setUp() for %s.%s(%s) starts here ---\n" %
296 (self.__class__.__name__, self._testMethodName,
297 self._testMethodDoc))
298 self.vapi.cli("clear trace")
299 # store the test instance inside the test class - so that objects
300 # holding the class can access instance methods (like assertEqual)
301 type(self).test_instance = self
304 def pg_enable_capture(cls, interfaces):
306 Enable capture on packet-generator interfaces
308 :param interfaces: iterable interface indexes
315 def pg_start(cls, sleep_time=1):
317 Enable the packet-generator and send all prepared packet streams
318 Remove the packet streams afterwards
320 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
321 cls.vapi.cli('packet-generator enable')
322 sleep(sleep_time) # give VPP some time to process the packets
323 for stream in cls.pg_streams:
324 cls.vapi.cli('packet-generator delete %s' % stream)
328 def create_pg_interfaces(cls, interfaces):
330 Create packet-generator interfaces
332 :param interfaces: iterable indexes of the interfaces
337 intf = VppPGInterface(cls, i)
338 setattr(cls, intf.name, intf)
340 cls.pg_interfaces = result
344 def create_loopback_interfaces(cls, interfaces):
346 Create loopback interfaces
348 :param interfaces: iterable indexes of the interfaces
353 intf = VppLoInterface(cls, i)
354 setattr(cls, intf.name, intf)
356 cls.lo_interfaces = result
360 def extend_packet(packet, size):
362 Extend packet to given size by padding with spaces
363 NOTE: Currently works only when Raw layer is present.
365 :param packet: packet
366 :param size: target size
369 packet_len = len(packet) + 4
370 extend = size - packet_len
372 packet[Raw].load += ' ' * extend
374 def add_packet_info_to_list(self, info):
376 Add packet info to the testcase's packet info list
378 :param info: packet info
381 info.index = len(self.packet_infos)
382 self.packet_infos[info.index] = info
384 def create_packet_info(self, src_pg_index, dst_pg_index):
386 Create packet info object containing the source and destination indexes
387 and add it to the testcase's packet info list
389 :param src_pg_index: source packet-generator index
390 :param dst_pg_index: destination packet-generator index
392 :returns: _PacketInfo object
396 self.add_packet_info_to_list(info)
397 info.src = src_pg_index
398 info.dst = dst_pg_index
402 def info_to_payload(info):
404 Convert _PacketInfo object to packet payload
406 :param info: _PacketInfo object
408 :returns: string containing serialized data from packet info
410 return "%d %d %d" % (info.index, info.src, info.dst)
413 def payload_to_info(payload):
415 Convert packet payload to _PacketInfo object
417 :param payload: packet payload
419 :returns: _PacketInfo object containing de-serialized data from payload
422 numbers = payload.split()
424 info.index = int(numbers[0])
425 info.src = int(numbers[1])
426 info.dst = int(numbers[2])
429 def get_next_packet_info(self, info):
431 Iterate over the packet info list stored in the testcase
432 Start iteration with first element if info is None
433 Continue based on index in info if info is specified
435 :param info: info or None
436 :returns: next info in list or None if no more infos
441 next_index = info.index + 1
442 if next_index == len(self.packet_infos):
445 return self.packet_infos[next_index]
447 def get_next_packet_info_for_interface(self, src_index, info):
449 Search the packet info list for the next packet info with same source
452 :param src_index: source interface index to search for
453 :param info: packet info - where to start the search
454 :returns: packet info or None
458 info = self.get_next_packet_info(info)
461 if info.src == src_index:
464 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
466 Search the packet info list for the next packet info with same source
467 and destination interface indexes
469 :param src_index: source interface index to search for
470 :param dst_index: destination interface index to search for
471 :param info: packet info - where to start the search
472 :returns: packet info or None
476 info = self.get_next_packet_info_for_interface(src_index, info)
479 if info.dst == dst_index:
482 def assert_equal(self, real_value, expected_value, name_or_class=None):
483 if name_or_class is None:
484 self.assertEqual(real_value, expected_value, msg)
487 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
488 msg = msg % (getdoc(name_or_class).strip(),
489 real_value, str(name_or_class(real_value)),
490 expected_value, str(name_or_class(expected_value)))
492 msg = "Invalid %s: %s does not match expected value %s" % (
493 name_or_class, real_value, expected_value)
495 self.assertEqual(real_value, expected_value, msg)
506 msg = "Invalid %s: %s out of range <%s,%s>" % (
507 name, real_value, expected_min, expected_max)
508 self.assertTrue(expected_min <= real_value <= expected_max, msg)
511 class VppTestResult(unittest.TestResult):
513 @property result_string
514 String variable to store the test case result string.
516 List variable containing 2-tuples of TestCase instances and strings
517 holding formatted tracebacks. Each tuple represents a test which
518 raised an unexpected exception.
520 List variable containing 2-tuples of TestCase instances and strings
521 holding formatted tracebacks. Each tuple represents a test where
522 a failure was explicitly signalled using the TestCase.assert*()
526 def __init__(self, stream, descriptions, verbosity):
528 :param stream File descriptor to store where to report test results. Set
529 to the standard error stream by default.
530 :param descriptions Boolean variable to store information if to use test
532 :param verbosity Integer variable to store required verbosity level.
534 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
536 self.descriptions = descriptions
537 self.verbosity = verbosity
538 self.result_string = None
540 def addSuccess(self, test):
542 Record a test succeeded result
547 unittest.TestResult.addSuccess(self, test)
548 self.result_string = colorize("OK", GREEN)
550 def addSkip(self, test, reason):
552 Record a test skipped.
558 unittest.TestResult.addSkip(self, test, reason)
559 self.result_string = colorize("SKIP", YELLOW)
561 def addFailure(self, test, err):
563 Record a test failed result
566 :param err: error message
569 unittest.TestResult.addFailure(self, test, err)
570 if hasattr(test, 'tempdir'):
571 self.result_string = colorize("FAIL", RED) + \
572 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
574 self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
576 def addError(self, test, err):
578 Record a test error result
581 :param err: error message
584 unittest.TestResult.addError(self, test, err)
585 if hasattr(test, 'tempdir'):
586 self.result_string = colorize("ERROR", RED) + \
587 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
589 self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
591 def getDescription(self, test):
596 :returns: test description
599 # TODO: if none print warning not raise exception
600 short_description = test.shortDescription()
601 if self.descriptions and short_description:
602 return short_description
606 def startTest(self, test):
613 unittest.TestResult.startTest(self, test)
614 if self.verbosity > 0:
616 "Starting " + self.getDescription(test) + " ...")
617 self.stream.writeln(single_line_delim)
619 def stopTest(self, test):
626 unittest.TestResult.stopTest(self, test)
627 if self.verbosity > 0:
628 self.stream.writeln(single_line_delim)
629 self.stream.writeln("%-60s%s" %
630 (self.getDescription(test), self.result_string))
631 self.stream.writeln(single_line_delim)
633 self.stream.writeln("%-60s%s" %
634 (self.getDescription(test), self.result_string))
636 def printErrors(self):
638 Print errors from running the test case
640 self.stream.writeln()
641 self.printErrorList('ERROR', self.errors)
642 self.printErrorList('FAIL', self.failures)
644 def printErrorList(self, flavour, errors):
646 Print error list to the output stream together with error type
647 and test case description.
649 :param flavour: error type
650 :param errors: iterable errors
653 for test, err in errors:
654 self.stream.writeln(double_line_delim)
655 self.stream.writeln("%s: %s" %
656 (flavour, self.getDescription(test)))
657 self.stream.writeln(single_line_delim)
658 self.stream.writeln("%s" % err)
661 class VppTestRunner(unittest.TextTestRunner):
663 A basic test runner implementation which prints results on standard error.
666 def resultclass(self):
667 """Class maintaining the results of the tests"""
677 print("Running tests using custom test runner") # debug message
678 return super(VppTestRunner, self).run(test)