9 from Queue import Queue
10 from threading import Thread
11 from inspect import getdoc
12 from hook import StepHook, PollHook
13 from vpp_pg_interface import VppPGInterface
14 from vpp_lo_interface import VppLoInterface
15 from vpp_papi_provider import VppPapiProvider
16 from scapy.packet import Raw
20 Test framework module.
22 The module provides a set of tools for constructing and running tests and
23 representing the results.
27 class _PacketInfo(object):
28 """Private class to create packet info object.
30 Help process information about the next packet.
31 Set variables to default values.
33 #: Store the index of the packet.
35 #: Store the index of the source packet generator interface of the packet.
37 #: Store the index of the destination packet generator interface
40 #: Store the copy of the former packet.
44 def pump_output(out, queue):
45 for line in iter(out.readline, b''):
49 class VppTestCase(unittest.TestCase):
50 """This subclass is a base class for VPP test cases that are implemented as
51 classes. It provides methods to create and run test case.
55 def packet_infos(self):
56 """List of packet infos"""
57 return self._packet_infos
60 def packet_infos(self, value):
61 self._packet_infos = value
65 """Return the instance of this testcase"""
66 return cls.test_instance
69 def set_debug_flags(cls, d):
70 cls.debug_core = False
72 cls.debug_gdbserver = False
77 if resource.getrlimit(resource.RLIMIT_CORE)[0] <= 0:
78 # give a heads up if this is actually useless
79 cls.logger.critical("WARNING: core size limit is set 0, core "
80 "files will NOT be created")
84 elif dl == "gdbserver":
85 cls.debug_gdbserver = True
87 raise Exception("Unrecognized DEBUG option: '%s'" % d)
90 def setUpConstants(cls):
91 """ Set-up the test case class based on environment variables """
94 cls.step = True if s.lower() in ("y", "yes", "1") else False
98 d = os.getenv("DEBUG")
101 cls.set_debug_flags(d)
102 cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
103 cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
105 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
106 debug_cli = "cli-listen localhost:5002"
107 cls.vpp_cmdline = [cls.vpp_bin, "unix", "{", "nodaemon", debug_cli, "}",
108 "api-segment", "{", "prefix", cls.shm_prefix, "}"]
109 if cls.plugin_path is not None:
110 cls.vpp_cmdline.extend(["plugin_path", cls.plugin_path])
111 cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
114 def wait_for_enter(cls):
115 if cls.debug_gdbserver:
116 print(double_line_delim)
117 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
119 print(double_line_delim)
120 print("Spawned VPP with PID: %d" % cls.vpp.pid)
122 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
124 print(single_line_delim)
125 print("You can debug the VPP using e.g.:")
126 if cls.debug_gdbserver:
127 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
128 print("Now is the time to attach a gdb by running the above "
129 "command, set up breakpoints etc. and then resume VPP from "
130 "within gdb by issuing the 'continue' command")
132 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
133 print("Now is the time to attach a gdb by running the above "
134 "command and set up breakpoints etc.")
135 print(single_line_delim)
136 raw_input("Press ENTER to continue running the testcase...")
140 cmdline = cls.vpp_cmdline
142 if cls.debug_gdbserver:
143 gdbserver = '/usr/bin/gdbserver'
144 if not os.path.isfile(gdbserver) or \
145 not os.access(gdbserver, os.X_OK):
146 raise Exception("gdbserver binary '%s' does not exist or is "
147 "not executable" % gdbserver)
149 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
150 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
153 cls.vpp = subprocess.Popen(cmdline,
154 stdout=subprocess.PIPE,
155 stderr=subprocess.PIPE,
157 except Exception as e:
158 cls.logger.critical("Couldn't start vpp: %s" % e)
166 Perform class setup before running the testcase
167 Remove shared memory files, start vpp and connect the vpp-api
169 cls.logger = getLogger(cls.__name__)
170 cls.tempdir = tempfile.mkdtemp(
171 prefix='vpp-unittest-' + cls.__name__ + '-')
172 cls.shm_prefix = cls.tempdir.split("/")[-1]
173 os.chdir(cls.tempdir)
174 cls.logger.info("Temporary dir is %s, shm prefix is %s",
175 cls.tempdir, cls.shm_prefix)
178 cls.packet_infos = {}
181 print(double_line_delim)
182 print(colorize(getdoc(cls).splitlines()[0], YELLOW))
183 print(double_line_delim)
184 # need to catch exceptions here because if we raise, then the cleanup
185 # doesn't get called and we might end with a zombie vpp
188 cls.vpp_stdout_queue = Queue()
189 cls.vpp_stdout_reader_thread = Thread(target=pump_output, args=(
190 cls.vpp.stdout, cls.vpp_stdout_queue))
191 cls.vpp_stdout_reader_thread.start()
192 cls.vpp_stderr_queue = Queue()
193 cls.vpp_stderr_reader_thread = Thread(target=pump_output, args=(
194 cls.vpp.stderr, cls.vpp_stderr_queue))
195 cls.vpp_stderr_reader_thread.start()
196 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls)
201 cls.vapi.register_hook(hook)
207 if cls.debug_gdbserver:
208 print(colorize("You're running VPP inside gdbserver but "
209 "VPP-API connection failed, did you forget "
210 "to 'continue' VPP from within gdb?", RED))
213 t, v, tb = sys.exc_info()
223 Disconnect vpp-api, kill vpp and cleanup shared memory files
225 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
227 if cls.vpp.returncode is None:
228 print(double_line_delim)
229 print("VPP or GDB server is still running")
230 print(single_line_delim)
231 raw_input("When done debugging, press ENTER to kill the process"
232 " and finish running the testcase...")
234 if hasattr(cls, 'vpp'):
235 if hasattr(cls, 'vapi'):
236 cls.vapi.disconnect()
238 if cls.vpp.returncode is None:
242 if hasattr(cls, 'vpp_stdout_queue'):
243 cls.logger.info(single_line_delim)
244 cls.logger.info('VPP output to stdout while running %s:',
246 cls.logger.info(single_line_delim)
247 f = open(cls.tempdir + '/vpp_stdout.txt', 'w')
248 while not cls.vpp_stdout_queue.empty():
249 line = cls.vpp_stdout_queue.get_nowait()
251 cls.logger.info('VPP stdout: %s' % line.rstrip('\n'))
253 if hasattr(cls, 'vpp_stderr_queue'):
254 cls.logger.info(single_line_delim)
255 cls.logger.info('VPP output to stderr while running %s:',
257 cls.logger.info(single_line_delim)
258 f = open(cls.tempdir + '/vpp_stderr.txt', 'w')
259 while not cls.vpp_stderr_queue.empty():
260 line = cls.vpp_stderr_queue.get_nowait()
262 cls.logger.info('VPP stderr: %s' % line.rstrip('\n'))
263 cls.logger.info(single_line_delim)
266 def tearDownClass(cls):
267 """ Perform final cleanup after running all tests in this test-case """
271 """ Show various debug prints after each test """
272 if not self.vpp_dead:
273 self.logger.debug(self.vapi.cli("show trace"))
274 self.logger.info(self.vapi.ppcli("show int"))
275 self.logger.info(self.vapi.ppcli("show hardware"))
276 self.logger.info(self.vapi.ppcli("show error"))
277 self.logger.info(self.vapi.ppcli("show run"))
280 """ Clear trace before running each test"""
282 raise Exception("VPP is dead when setting up the test")
283 self.vapi.cli("clear trace")
284 # store the test instance inside the test class - so that objects
285 # holding the class can access instance methods (like assertEqual)
286 type(self).test_instance = self
289 def pg_enable_capture(cls, interfaces):
291 Enable capture on packet-generator interfaces
293 :param interfaces: iterable interface indexes
302 Enable the packet-generator and send all prepared packet streams
303 Remove the packet streams afterwards
305 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
306 cls.vapi.cli('packet-generator enable')
307 sleep(1) # give VPP some time to process the packets
308 for stream in cls.pg_streams:
309 cls.vapi.cli('packet-generator delete %s' % stream)
313 def create_pg_interfaces(cls, interfaces):
315 Create packet-generator interfaces
317 :param interfaces: iterable indexes of the interfaces
322 intf = VppPGInterface(cls, i)
323 setattr(cls, intf.name, intf)
325 cls.pg_interfaces = result
329 def create_loopback_interfaces(cls, interfaces):
331 Create loopback interfaces
333 :param interfaces: iterable indexes of the interfaces
338 intf = VppLoInterface(cls, i)
339 setattr(cls, intf.name, intf)
341 cls.lo_interfaces = result
345 def extend_packet(packet, size):
347 Extend packet to given size by padding with spaces
348 NOTE: Currently works only when Raw layer is present.
350 :param packet: packet
351 :param size: target size
354 packet_len = len(packet) + 4
355 extend = size - packet_len
357 packet[Raw].load += ' ' * extend
359 def add_packet_info_to_list(self, info):
361 Add packet info to the testcase's packet info list
363 :param info: packet info
366 info.index = len(self.packet_infos)
367 self.packet_infos[info.index] = info
369 def create_packet_info(self, src_pg_index, dst_pg_index):
371 Create packet info object containing the source and destination indexes
372 and add it to the testcase's packet info list
374 :param src_pg_index: source packet-generator index
375 :param dst_pg_index: destination packet-generator index
377 :returns: _PacketInfo object
381 self.add_packet_info_to_list(info)
382 info.src = src_pg_index
383 info.dst = dst_pg_index
387 def info_to_payload(info):
389 Convert _PacketInfo object to packet payload
391 :param info: _PacketInfo object
393 :returns: string containing serialized data from packet info
395 return "%d %d %d" % (info.index, info.src, info.dst)
398 def payload_to_info(payload):
400 Convert packet payload to _PacketInfo object
402 :param payload: packet payload
404 :returns: _PacketInfo object containing de-serialized data from payload
407 numbers = payload.split()
409 info.index = int(numbers[0])
410 info.src = int(numbers[1])
411 info.dst = int(numbers[2])
414 def get_next_packet_info(self, info):
416 Iterate over the packet info list stored in the testcase
417 Start iteration with first element if info is None
418 Continue based on index in info if info is specified
420 :param info: info or None
421 :returns: next info in list or None if no more infos
426 next_index = info.index + 1
427 if next_index == len(self.packet_infos):
430 return self.packet_infos[next_index]
432 def get_next_packet_info_for_interface(self, src_index, info):
434 Search the packet info list for the next packet info with same source
437 :param src_index: source interface index to search for
438 :param info: packet info - where to start the search
439 :returns: packet info or None
443 info = self.get_next_packet_info(info)
446 if info.src == src_index:
449 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
451 Search the packet info list for the next packet info with same source
452 and destination interface indexes
454 :param src_index: source interface index to search for
455 :param dst_index: destination interface index to search for
456 :param info: packet info - where to start the search
457 :returns: packet info or None
461 info = self.get_next_packet_info_for_interface(src_index, info)
464 if info.dst == dst_index:
467 def assert_equal(self, real_value, expected_value, name_or_class=None):
468 if name_or_class is None:
469 self.assertEqual(real_value, expected_value, msg)
472 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
473 msg = msg % (getdoc(name_or_class).strip(),
474 real_value, str(name_or_class(real_value)),
475 expected_value, str(name_or_class(expected_value)))
477 msg = "Invalid %s: %s does not match expected value %s" % (
478 name_or_class, real_value, expected_value)
480 self.assertEqual(real_value, expected_value, msg)
491 msg = "Invalid %s: %s out of range <%s,%s>" % (
492 name, real_value, expected_min, expected_max)
493 self.assertTrue(expected_min <= real_value <= expected_max, msg)
496 class VppTestResult(unittest.TestResult):
498 @property result_string
499 String variable to store the test case result string.
501 List variable containing 2-tuples of TestCase instances and strings
502 holding formatted tracebacks. Each tuple represents a test which
503 raised an unexpected exception.
505 List variable containing 2-tuples of TestCase instances and strings
506 holding formatted tracebacks. Each tuple represents a test where
507 a failure was explicitly signalled using the TestCase.assert*()
511 def __init__(self, stream, descriptions, verbosity):
513 :param stream File descriptor to store where to report test results. Set
514 to the standard error stream by default.
515 :param descriptions Boolean variable to store information if to use test
517 :param verbosity Integer variable to store required verbosity level.
519 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
521 self.descriptions = descriptions
522 self.verbosity = verbosity
523 self.result_string = None
525 def addSuccess(self, test):
527 Record a test succeeded result
532 unittest.TestResult.addSuccess(self, test)
533 self.result_string = colorize("OK", GREEN)
535 def addSkip(self, test, reason):
537 Record a test skipped.
543 unittest.TestResult.addSkip(self, test, reason)
544 self.result_string = colorize("SKIP", YELLOW)
546 def addFailure(self, test, err):
548 Record a test failed result
551 :param err: error message
554 unittest.TestResult.addFailure(self, test, err)
555 if hasattr(test, 'tempdir'):
556 self.result_string = colorize("FAIL", RED) + \
557 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
559 self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
561 def addError(self, test, err):
563 Record a test error result
566 :param err: error message
569 unittest.TestResult.addError(self, test, err)
570 if hasattr(test, 'tempdir'):
571 self.result_string = colorize("ERROR", RED) + \
572 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
574 self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
576 def getDescription(self, test):
581 :returns: test description
584 # TODO: if none print warning not raise exception
585 short_description = test.shortDescription()
586 if self.descriptions and short_description:
587 return short_description
591 def startTest(self, test):
598 unittest.TestResult.startTest(self, test)
599 if self.verbosity > 0:
601 "Starting " + self.getDescription(test) + " ...")
602 self.stream.writeln(single_line_delim)
604 def stopTest(self, test):
611 unittest.TestResult.stopTest(self, test)
612 if self.verbosity > 0:
613 self.stream.writeln(single_line_delim)
614 self.stream.writeln("%-60s%s" %
615 (self.getDescription(test), self.result_string))
616 self.stream.writeln(single_line_delim)
618 self.stream.writeln("%-60s%s" %
619 (self.getDescription(test), self.result_string))
621 def printErrors(self):
623 Print errors from running the test case
625 self.stream.writeln()
626 self.printErrorList('ERROR', self.errors)
627 self.printErrorList('FAIL', self.failures)
629 def printErrorList(self, flavour, errors):
631 Print error list to the output stream together with error type
632 and test case description.
634 :param flavour: error type
635 :param errors: iterable errors
638 for test, err in errors:
639 self.stream.writeln(double_line_delim)
640 self.stream.writeln("%s: %s" %
641 (flavour, self.getDescription(test)))
642 self.stream.writeln(single_line_delim)
643 self.stream.writeln("%s" % err)
646 class VppTestRunner(unittest.TextTestRunner):
648 A basic test runner implementation which prints results on standard error.
651 def resultclass(self):
652 """Class maintaining the results of the tests"""
662 print("Running tests using custom test runner") # debug message
663 return super(VppTestRunner, self).run(test)