10 from time import sleep
11 from Queue import Queue
12 from threading import Thread
13 from inspect import getdoc
14 from hook import StepHook, PollHook
15 from vpp_pg_interface import VppPGInterface
16 from vpp_papi_provider import VppPapiProvider
17 from scapy.packet import Raw
21 Test framework module.
23 The module provides a set of tools for constructing and running tests and
24 representing the results.
28 class _PacketInfo(object):
29 """Private class to create packet info object.
31 Help process information about the next packet.
32 Set variables to default values.
34 Integer variable to store the index of the packet.
36 Integer variable to store the index of the source packet generator
37 interface of the packet.
39 Integer variable to store the index of the destination packet generator
40 interface of the packet.
42 Object variable to store the copy of the former packet.
52 def pump_output(out, queue):
53 for line in iter(out.readline, b''):
57 class VppTestCase(unittest.TestCase):
59 Subclass of the python unittest.TestCase class.
61 This subclass is a base class for test cases that are implemented as classes
62 It provides methods to create and run test case.
67 def packet_infos(self):
68 """List of packet infos"""
69 return self._packet_infos
72 def packet_infos(self, value):
73 self._packet_infos = value
77 """Return the instance of this testcase"""
78 return cls.test_instance
81 def set_debug_flags(cls, d):
82 cls.debug_core = False
84 cls.debug_gdbserver = False
89 if resource.getrlimit(resource.RLIMIT_CORE)[0] <= 0:
90 # give a heads up if this is actually useless
91 cls.logger.critical("WARNING: core size limit is set 0, core "
92 "files will NOT be created")
96 elif dl == "gdbserver":
97 cls.debug_gdbserver = True
99 raise Exception("Unrecognized DEBUG option: '%s'" % d)
102 def setUpConstants(cls):
103 """ Set-up the test case class based on environment variables """
105 s = os.getenv("STEP")
106 cls.step = True if s.lower() in ("y", "yes", "1") else False
110 d = os.getenv("DEBUG")
113 cls.set_debug_flags(d)
114 cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
115 cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
117 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
118 debug_cli = "cli-listen localhost:5002"
119 cls.vpp_cmdline = [cls.vpp_bin, "unix", "{", "nodaemon", debug_cli, "}",
120 "api-segment", "{", "prefix", cls.shm_prefix, "}"]
121 if cls.plugin_path is not None:
122 cls.vpp_cmdline.extend(["plugin_path", cls.plugin_path])
123 cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
126 def wait_for_enter(cls):
127 if cls.debug_gdbserver:
128 print(double_line_delim)
129 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
131 print(double_line_delim)
132 print("Spawned VPP with PID: %d" % cls.vpp.pid)
134 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
136 print(single_line_delim)
137 print("You can debug the VPP using e.g.:")
138 if cls.debug_gdbserver:
139 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
140 print("Now is the time to attach a gdb by running the above "
141 "command, set up breakpoints etc. and then resume VPP from "
142 "within gdb by issuing the 'continue' command")
144 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
145 print("Now is the time to attach a gdb by running the above "
146 "command and set up breakpoints etc.")
147 print(single_line_delim)
148 raw_input("Press ENTER to continue running the testcase...")
152 cmdline = cls.vpp_cmdline
154 if cls.debug_gdbserver:
155 gdbserver = '/usr/bin/gdbserver'
156 if not os.path.isfile(gdbserver) or \
157 not os.access(gdbserver, os.X_OK):
158 raise Exception("gdbserver binary '%s' does not exist or is "
159 "not executable" % gdbserver)
161 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
162 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
165 cls.vpp = subprocess.Popen(cmdline,
166 stdout=subprocess.PIPE,
167 stderr=subprocess.PIPE,
169 except Exception as e:
170 cls.logger.critical("Couldn't start vpp: %s" % e)
178 Perform class setup before running the testcase
179 Remove shared memory files, start vpp and connect the vpp-api
181 cls.logger = getLogger(cls.__name__)
182 cls.tempdir = tempfile.mkdtemp(
183 prefix='vpp-unittest-' + cls.__name__ + '-')
184 cls.shm_prefix = cls.tempdir.split("/")[-1]
185 os.chdir(cls.tempdir)
186 cls.logger.info("Temporary dir is %s, shm prefix is %s",
187 cls.tempdir, cls.shm_prefix)
190 cls.packet_infos = {}
192 print(double_line_delim)
193 print(colorize(getdoc(cls), YELLOW))
194 print(double_line_delim)
195 # need to catch exceptions here because if we raise, then the cleanup
196 # doesn't get called and we might end with a zombie vpp
200 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix)
202 cls.vapi.register_hook(StepHook(cls))
204 cls.vapi.register_hook(PollHook(cls))
209 if cls.debug_gdbserver:
210 print(colorize("You're running VPP inside gdbserver but "
211 "VPP-API connection failed, did you forget "
212 "to 'continue' VPP from within gdb?", RED))
214 cls.vpp_stdout_queue = Queue()
215 cls.vpp_stdout_reader_thread = Thread(
216 target=pump_output, args=(cls.vpp.stdout, cls.vpp_stdout_queue))
217 cls.vpp_stdout_reader_thread.start()
218 cls.vpp_stderr_queue = Queue()
219 cls.vpp_stderr_reader_thread = Thread(
220 target=pump_output, args=(cls.vpp.stderr, cls.vpp_stderr_queue))
221 cls.vpp_stderr_reader_thread.start()
223 if hasattr(cls, 'vpp'):
231 Disconnect vpp-api, kill vpp and cleanup shared memory files
233 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
235 if cls.vpp.returncode is None:
236 print(double_line_delim)
237 print("VPP or GDB server is still running")
238 print(single_line_delim)
239 raw_input("When done debugging, press ENTER to kill the process"
240 " and finish running the testcase...")
242 if hasattr(cls, 'vpp'):
243 cls.vapi.disconnect()
245 if cls.vpp.returncode is None:
249 if hasattr(cls, 'vpp_stdout_queue'):
250 cls.logger.info(single_line_delim)
251 cls.logger.info('VPP output to stdout while running %s:',
253 cls.logger.info(single_line_delim)
254 f = open(cls.tempdir + '/vpp_stdout.txt', 'w')
255 while not cls.vpp_stdout_queue.empty():
256 line = cls.vpp_stdout_queue.get_nowait()
258 cls.logger.info('VPP stdout: %s' % line.rstrip('\n'))
260 if hasattr(cls, 'vpp_stderr_queue'):
261 cls.logger.info(single_line_delim)
262 cls.logger.info('VPP output to stderr while running %s:',
264 cls.logger.info(single_line_delim)
265 f = open(cls.tempdir + '/vpp_stderr.txt', 'w')
266 while not cls.vpp_stderr_queue.empty():
267 line = cls.vpp_stderr_queue.get_nowait()
269 cls.logger.info('VPP stderr: %s' % line.rstrip('\n'))
270 cls.logger.info(single_line_delim)
273 def tearDownClass(cls):
274 """ Perform final cleanup after running all tests in this test-case """
278 """ Show various debug prints after each test """
279 if not self.vpp_dead:
280 self.logger.info(self.vapi.cli("show int"))
281 self.logger.info(self.vapi.cli("show trace"))
282 self.logger.info(self.vapi.cli("show hardware"))
283 self.logger.info(self.vapi.cli("show error"))
284 self.logger.info(self.vapi.cli("show run"))
287 """ Clear trace before running each test"""
288 self.vapi.cli("clear trace")
289 # store the test instance inside the test class - so that objects
290 # holding the class can access instance methods (like assertEqual)
291 type(self).test_instance = self
294 def pg_enable_capture(cls, interfaces):
296 Enable capture on packet-generator interfaces
298 :param interfaces: iterable interface indexes
307 Enable the packet-generator and send all prepared packet streams
308 Remove the packet streams afterwards
310 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
311 cls.vapi.cli('packet-generator enable')
312 sleep(1) # give VPP some time to process the packets
313 for stream in cls.pg_streams:
314 cls.vapi.cli('packet-generator delete %s' % stream)
318 def create_pg_interfaces(cls, interfaces):
320 Create packet-generator interfaces
322 :param interfaces: iterable indexes of the interfaces
327 intf = VppPGInterface(cls, i)
328 setattr(cls, intf.name, intf)
330 cls.pg_interfaces = result
334 def extend_packet(packet, size):
336 Extend packet to given size by padding with spaces
337 NOTE: Currently works only when Raw layer is present.
339 :param packet: packet
340 :param size: target size
343 packet_len = len(packet) + 4
344 extend = size - packet_len
346 packet[Raw].load += ' ' * extend
348 def add_packet_info_to_list(self, info):
350 Add packet info to the testcase's packet info list
352 :param info: packet info
355 info.index = len(self.packet_infos)
356 self.packet_infos[info.index] = info
358 def create_packet_info(self, src_pg_index, dst_pg_index):
360 Create packet info object containing the source and destination indexes
361 and add it to the testcase's packet info list
363 :param src_pg_index: source packet-generator index
364 :param dst_pg_index: destination packet-generator index
366 :returns: _PacketInfo object
370 self.add_packet_info_to_list(info)
371 info.src = src_pg_index
372 info.dst = dst_pg_index
376 def info_to_payload(info):
378 Convert _PacketInfo object to packet payload
380 :param info: _PacketInfo object
382 :returns: string containing serialized data from packet info
384 return "%d %d %d" % (info.index, info.src, info.dst)
387 def payload_to_info(payload):
389 Convert packet payload to _PacketInfo object
391 :param payload: packet payload
393 :returns: _PacketInfo object containing de-serialized data from payload
396 numbers = payload.split()
398 info.index = int(numbers[0])
399 info.src = int(numbers[1])
400 info.dst = int(numbers[2])
403 def get_next_packet_info(self, info):
405 Iterate over the packet info list stored in the testcase
406 Start iteration with first element if info is None
407 Continue based on index in info if info is specified
409 :param info: info or None
410 :returns: next info in list or None if no more infos
415 next_index = info.index + 1
416 if next_index == len(self.packet_infos):
419 return self.packet_infos[next_index]
421 def get_next_packet_info_for_interface(self, src_index, info):
423 Search the packet info list for the next packet info with same source
426 :param src_index: source interface index to search for
427 :param info: packet info - where to start the search
428 :returns: packet info or None
432 info = self.get_next_packet_info(info)
435 if info.src == src_index:
438 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
440 Search the packet info list for the next packet info with same source
441 and destination interface indexes
443 :param src_index: source interface index to search for
444 :param dst_index: destination interface index to search for
445 :param info: packet info - where to start the search
446 :returns: packet info or None
450 info = self.get_next_packet_info_for_interface(src_index, info)
453 if info.dst == dst_index:
457 class VppTestResult(unittest.TestResult):
459 @property result_string
460 String variable to store the test case result string.
462 List variable containing 2-tuples of TestCase instances and strings
463 holding formatted tracebacks. Each tuple represents a test which
464 raised an unexpected exception.
466 List variable containing 2-tuples of TestCase instances and strings
467 holding formatted tracebacks. Each tuple represents a test where
468 a failure was explicitly signalled using the TestCase.assert*()
472 def __init__(self, stream, descriptions, verbosity):
474 :param stream File descriptor to store where to report test results. Set
475 to the standard error stream by default.
476 :param descriptions Boolean variable to store information if to use test
478 :param verbosity Integer variable to store required verbosity level.
480 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
482 self.descriptions = descriptions
483 self.verbosity = verbosity
484 self.result_string = None
486 def addSuccess(self, test):
488 Record a test succeeded result
493 unittest.TestResult.addSuccess(self, test)
494 self.result_string = colorize("OK", GREEN)
496 def addSkip(self, test, reason):
498 Record a test skipped.
504 unittest.TestResult.addSkip(self, test, reason)
505 self.result_string = colorize("SKIP", YELLOW)
507 def addFailure(self, test, err):
509 Record a test failed result
512 :param err: error message
515 unittest.TestResult.addFailure(self, test, err)
516 if hasattr(test, 'tempdir'):
517 self.result_string = colorize("FAIL", RED) + \
518 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
520 self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
522 def addError(self, test, err):
524 Record a test error result
527 :param err: error message
530 unittest.TestResult.addError(self, test, err)
531 if hasattr(test, 'tempdir'):
532 self.result_string = colorize("ERROR", RED) + \
533 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
535 self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
537 def getDescription(self, test):
542 :returns: test description
545 # TODO: if none print warning not raise exception
546 short_description = test.shortDescription()
547 if self.descriptions and short_description:
548 return short_description
552 def startTest(self, test):
559 unittest.TestResult.startTest(self, test)
560 if self.verbosity > 0:
562 "Starting " + self.getDescription(test) + " ...")
563 self.stream.writeln(single_line_delim)
565 def stopTest(self, test):
572 unittest.TestResult.stopTest(self, test)
573 if self.verbosity > 0:
574 self.stream.writeln(single_line_delim)
575 self.stream.writeln("%-60s%s" %
576 (self.getDescription(test), self.result_string))
577 self.stream.writeln(single_line_delim)
579 self.stream.writeln("%-60s%s" %
580 (self.getDescription(test), self.result_string))
582 def printErrors(self):
584 Print errors from running the test case
586 self.stream.writeln()
587 self.printErrorList('ERROR', self.errors)
588 self.printErrorList('FAIL', self.failures)
590 def printErrorList(self, flavour, errors):
592 Print error list to the output stream together with error type
593 and test case description.
595 :param flavour: error type
596 :param errors: iterable errors
599 for test, err in errors:
600 self.stream.writeln(double_line_delim)
601 self.stream.writeln("%s: %s" %
602 (flavour, self.getDescription(test)))
603 self.stream.writeln(single_line_delim)
604 self.stream.writeln("%s" % err)
607 class VppTestRunner(unittest.TextTestRunner):
609 A basic test runner implementation which prints results on standard error.
612 def resultclass(self):
613 """Class maintaining the results of the tests"""
623 print("Running tests using custom test runner") # debug message
624 return super(VppTestRunner, self).run(test)