9 from Queue import Queue
10 from threading import Thread
11 from inspect import getdoc
12 from hook import StepHook, PollHook
13 from vpp_pg_interface import VppPGInterface
14 from vpp_lo_interface import VppLoInterface
15 from vpp_papi_provider import VppPapiProvider
16 from scapy.packet import Raw
20 Test framework module.
22 The module provides a set of tools for constructing and running tests and
23 representing the results.
27 class _PacketInfo(object):
28 """Private class to create packet info object.
30 Help process information about the next packet.
31 Set variables to default values.
33 Integer variable to store the index of the packet.
35 Integer variable to store the index of the source packet generator
36 interface of the packet.
38 Integer variable to store the index of the destination packet generator
39 interface of the packet.
41 Object variable to store the copy of the former packet.
51 def pump_output(out, queue):
52 for line in iter(out.readline, b''):
56 class VppTestCase(unittest.TestCase):
58 Subclass of the python unittest.TestCase class.
60 This subclass is a base class for test cases that are implemented as classes
61 It provides methods to create and run test case.
66 def packet_infos(self):
67 """List of packet infos"""
68 return self._packet_infos
71 def packet_infos(self, value):
72 self._packet_infos = value
76 """Return the instance of this testcase"""
77 return cls.test_instance
80 def set_debug_flags(cls, d):
81 cls.debug_core = False
83 cls.debug_gdbserver = False
88 if resource.getrlimit(resource.RLIMIT_CORE)[0] <= 0:
89 # give a heads up if this is actually useless
90 cls.logger.critical("WARNING: core size limit is set 0, core "
91 "files will NOT be created")
95 elif dl == "gdbserver":
96 cls.debug_gdbserver = True
98 raise Exception("Unrecognized DEBUG option: '%s'" % d)
101 def setUpConstants(cls):
102 """ Set-up the test case class based on environment variables """
104 s = os.getenv("STEP")
105 cls.step = True if s.lower() in ("y", "yes", "1") else False
109 d = os.getenv("DEBUG")
112 cls.set_debug_flags(d)
113 cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
114 cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
116 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
117 debug_cli = "cli-listen localhost:5002"
118 cls.vpp_cmdline = [cls.vpp_bin, "unix", "{", "nodaemon", debug_cli, "}",
119 "api-segment", "{", "prefix", cls.shm_prefix, "}"]
120 if cls.plugin_path is not None:
121 cls.vpp_cmdline.extend(["plugin_path", cls.plugin_path])
122 cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
125 def wait_for_enter(cls):
126 if cls.debug_gdbserver:
127 print(double_line_delim)
128 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
130 print(double_line_delim)
131 print("Spawned VPP with PID: %d" % cls.vpp.pid)
133 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
135 print(single_line_delim)
136 print("You can debug the VPP using e.g.:")
137 if cls.debug_gdbserver:
138 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
139 print("Now is the time to attach a gdb by running the above "
140 "command, set up breakpoints etc. and then resume VPP from "
141 "within gdb by issuing the 'continue' command")
143 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
144 print("Now is the time to attach a gdb by running the above "
145 "command and set up breakpoints etc.")
146 print(single_line_delim)
147 raw_input("Press ENTER to continue running the testcase...")
151 cmdline = cls.vpp_cmdline
153 if cls.debug_gdbserver:
154 gdbserver = '/usr/bin/gdbserver'
155 if not os.path.isfile(gdbserver) or \
156 not os.access(gdbserver, os.X_OK):
157 raise Exception("gdbserver binary '%s' does not exist or is "
158 "not executable" % gdbserver)
160 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
161 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
164 cls.vpp = subprocess.Popen(cmdline,
165 stdout=subprocess.PIPE,
166 stderr=subprocess.PIPE,
168 except Exception as e:
169 cls.logger.critical("Couldn't start vpp: %s" % e)
177 Perform class setup before running the testcase
178 Remove shared memory files, start vpp and connect the vpp-api
180 cls.logger = getLogger(cls.__name__)
181 cls.tempdir = tempfile.mkdtemp(
182 prefix='vpp-unittest-' + cls.__name__ + '-')
183 cls.shm_prefix = cls.tempdir.split("/")[-1]
184 os.chdir(cls.tempdir)
185 cls.logger.info("Temporary dir is %s, shm prefix is %s",
186 cls.tempdir, cls.shm_prefix)
189 cls.packet_infos = {}
191 print(double_line_delim)
192 print(colorize(getdoc(cls), YELLOW))
193 print(double_line_delim)
194 # need to catch exceptions here because if we raise, then the cleanup
195 # doesn't get called and we might end with a zombie vpp
199 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix)
201 cls.vapi.register_hook(StepHook(cls))
203 cls.vapi.register_hook(PollHook(cls))
208 if cls.debug_gdbserver:
209 print(colorize("You're running VPP inside gdbserver but "
210 "VPP-API connection failed, did you forget "
211 "to 'continue' VPP from within gdb?", RED))
213 cls.vpp_stdout_queue = Queue()
214 cls.vpp_stdout_reader_thread = Thread(
215 target=pump_output, args=(cls.vpp.stdout, cls.vpp_stdout_queue))
216 cls.vpp_stdout_reader_thread.start()
217 cls.vpp_stderr_queue = Queue()
218 cls.vpp_stderr_reader_thread = Thread(
219 target=pump_output, args=(cls.vpp.stderr, cls.vpp_stderr_queue))
220 cls.vpp_stderr_reader_thread.start()
222 if hasattr(cls, 'vpp'):
230 Disconnect vpp-api, kill vpp and cleanup shared memory files
232 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
234 if cls.vpp.returncode is None:
235 print(double_line_delim)
236 print("VPP or GDB server is still running")
237 print(single_line_delim)
238 raw_input("When done debugging, press ENTER to kill the process"
239 " and finish running the testcase...")
241 if hasattr(cls, 'vpp'):
242 cls.vapi.disconnect()
244 if cls.vpp.returncode is None:
248 if hasattr(cls, 'vpp_stdout_queue'):
249 cls.logger.info(single_line_delim)
250 cls.logger.info('VPP output to stdout while running %s:',
252 cls.logger.info(single_line_delim)
253 f = open(cls.tempdir + '/vpp_stdout.txt', 'w')
254 while not cls.vpp_stdout_queue.empty():
255 line = cls.vpp_stdout_queue.get_nowait()
257 cls.logger.info('VPP stdout: %s' % line.rstrip('\n'))
259 if hasattr(cls, 'vpp_stderr_queue'):
260 cls.logger.info(single_line_delim)
261 cls.logger.info('VPP output to stderr while running %s:',
263 cls.logger.info(single_line_delim)
264 f = open(cls.tempdir + '/vpp_stderr.txt', 'w')
265 while not cls.vpp_stderr_queue.empty():
266 line = cls.vpp_stderr_queue.get_nowait()
268 cls.logger.info('VPP stderr: %s' % line.rstrip('\n'))
269 cls.logger.info(single_line_delim)
272 def tearDownClass(cls):
273 """ Perform final cleanup after running all tests in this test-case """
277 """ Show various debug prints after each test """
278 if not self.vpp_dead:
279 self.logger.info(self.vapi.ppcli("show int"))
280 self.logger.debug(self.vapi.cli("show trace"))
281 self.logger.info(self.vapi.ppcli("show hardware"))
282 self.logger.info(self.vapi.ppcli("show error"))
283 self.logger.info(self.vapi.ppcli("show run"))
286 """ Clear trace before running each test"""
287 self.vapi.cli("clear trace")
288 # store the test instance inside the test class - so that objects
289 # holding the class can access instance methods (like assertEqual)
290 type(self).test_instance = self
293 def pg_enable_capture(cls, interfaces):
295 Enable capture on packet-generator interfaces
297 :param interfaces: iterable interface indexes
306 Enable the packet-generator and send all prepared packet streams
307 Remove the packet streams afterwards
309 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
310 cls.vapi.cli('packet-generator enable')
311 sleep(1) # give VPP some time to process the packets
312 for stream in cls.pg_streams:
313 cls.vapi.cli('packet-generator delete %s' % stream)
317 def create_pg_interfaces(cls, interfaces):
319 Create packet-generator interfaces
321 :param interfaces: iterable indexes of the interfaces
326 intf = VppPGInterface(cls, i)
327 setattr(cls, intf.name, intf)
329 cls.pg_interfaces = result
333 def create_loopback_interfaces(cls, interfaces):
335 Create loopback interfaces
337 :param interfaces: iterable indexes of the interfaces
342 intf = VppLoInterface(cls, i)
343 setattr(cls, intf.name, intf)
345 cls.lo_interfaces = result
349 def extend_packet(packet, size):
351 Extend packet to given size by padding with spaces
352 NOTE: Currently works only when Raw layer is present.
354 :param packet: packet
355 :param size: target size
358 packet_len = len(packet) + 4
359 extend = size - packet_len
361 packet[Raw].load += ' ' * extend
363 def add_packet_info_to_list(self, info):
365 Add packet info to the testcase's packet info list
367 :param info: packet info
370 info.index = len(self.packet_infos)
371 self.packet_infos[info.index] = info
373 def create_packet_info(self, src_pg_index, dst_pg_index):
375 Create packet info object containing the source and destination indexes
376 and add it to the testcase's packet info list
378 :param src_pg_index: source packet-generator index
379 :param dst_pg_index: destination packet-generator index
381 :returns: _PacketInfo object
385 self.add_packet_info_to_list(info)
386 info.src = src_pg_index
387 info.dst = dst_pg_index
391 def info_to_payload(info):
393 Convert _PacketInfo object to packet payload
395 :param info: _PacketInfo object
397 :returns: string containing serialized data from packet info
399 return "%d %d %d" % (info.index, info.src, info.dst)
402 def payload_to_info(payload):
404 Convert packet payload to _PacketInfo object
406 :param payload: packet payload
408 :returns: _PacketInfo object containing de-serialized data from payload
411 numbers = payload.split()
413 info.index = int(numbers[0])
414 info.src = int(numbers[1])
415 info.dst = int(numbers[2])
418 def get_next_packet_info(self, info):
420 Iterate over the packet info list stored in the testcase
421 Start iteration with first element if info is None
422 Continue based on index in info if info is specified
424 :param info: info or None
425 :returns: next info in list or None if no more infos
430 next_index = info.index + 1
431 if next_index == len(self.packet_infos):
434 return self.packet_infos[next_index]
436 def get_next_packet_info_for_interface(self, src_index, info):
438 Search the packet info list for the next packet info with same source
441 :param src_index: source interface index to search for
442 :param info: packet info - where to start the search
443 :returns: packet info or None
447 info = self.get_next_packet_info(info)
450 if info.src == src_index:
453 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
455 Search the packet info list for the next packet info with same source
456 and destination interface indexes
458 :param src_index: source interface index to search for
459 :param dst_index: destination interface index to search for
460 :param info: packet info - where to start the search
461 :returns: packet info or None
465 info = self.get_next_packet_info_for_interface(src_index, info)
468 if info.dst == dst_index:
472 class VppTestResult(unittest.TestResult):
474 @property result_string
475 String variable to store the test case result string.
477 List variable containing 2-tuples of TestCase instances and strings
478 holding formatted tracebacks. Each tuple represents a test which
479 raised an unexpected exception.
481 List variable containing 2-tuples of TestCase instances and strings
482 holding formatted tracebacks. Each tuple represents a test where
483 a failure was explicitly signalled using the TestCase.assert*()
487 def __init__(self, stream, descriptions, verbosity):
489 :param stream File descriptor to store where to report test results. Set
490 to the standard error stream by default.
491 :param descriptions Boolean variable to store information if to use test
493 :param verbosity Integer variable to store required verbosity level.
495 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
497 self.descriptions = descriptions
498 self.verbosity = verbosity
499 self.result_string = None
501 def addSuccess(self, test):
503 Record a test succeeded result
508 unittest.TestResult.addSuccess(self, test)
509 self.result_string = colorize("OK", GREEN)
511 def addSkip(self, test, reason):
513 Record a test skipped.
519 unittest.TestResult.addSkip(self, test, reason)
520 self.result_string = colorize("SKIP", YELLOW)
522 def addFailure(self, test, err):
524 Record a test failed result
527 :param err: error message
530 unittest.TestResult.addFailure(self, test, err)
531 if hasattr(test, 'tempdir'):
532 self.result_string = colorize("FAIL", RED) + \
533 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
535 self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
537 def addError(self, test, err):
539 Record a test error result
542 :param err: error message
545 unittest.TestResult.addError(self, test, err)
546 if hasattr(test, 'tempdir'):
547 self.result_string = colorize("ERROR", RED) + \
548 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
550 self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
552 def getDescription(self, test):
557 :returns: test description
560 # TODO: if none print warning not raise exception
561 short_description = test.shortDescription()
562 if self.descriptions and short_description:
563 return short_description
567 def startTest(self, test):
574 unittest.TestResult.startTest(self, test)
575 if self.verbosity > 0:
577 "Starting " + self.getDescription(test) + " ...")
578 self.stream.writeln(single_line_delim)
580 def stopTest(self, test):
587 unittest.TestResult.stopTest(self, test)
588 if self.verbosity > 0:
589 self.stream.writeln(single_line_delim)
590 self.stream.writeln("%-60s%s" %
591 (self.getDescription(test), self.result_string))
592 self.stream.writeln(single_line_delim)
594 self.stream.writeln("%-60s%s" %
595 (self.getDescription(test), self.result_string))
597 def printErrors(self):
599 Print errors from running the test case
601 self.stream.writeln()
602 self.printErrorList('ERROR', self.errors)
603 self.printErrorList('FAIL', self.failures)
605 def printErrorList(self, flavour, errors):
607 Print error list to the output stream together with error type
608 and test case description.
610 :param flavour: error type
611 :param errors: iterable errors
614 for test, err in errors:
615 self.stream.writeln(double_line_delim)
616 self.stream.writeln("%s: %s" %
617 (flavour, self.getDescription(test)))
618 self.stream.writeln(single_line_delim)
619 self.stream.writeln("%s" % err)
622 class VppTestRunner(unittest.TextTestRunner):
624 A basic test runner implementation which prints results on standard error.
627 def resultclass(self):
628 """Class maintaining the results of the tests"""
638 print("Running tests using custom test runner") # debug message
639 return super(VppTestRunner, self).run(test)