9 from Queue import Queue
10 from threading import Thread
11 from inspect import getdoc
12 from hook import StepHook, PollHook
13 from vpp_pg_interface import VppPGInterface
14 from vpp_lo_interface import VppLoInterface
15 from vpp_papi_provider import VppPapiProvider
16 from scapy.packet import Raw
20 Test framework module.
22 The module provides a set of tools for constructing and running tests and
23 representing the results.
27 class _PacketInfo(object):
28 """Private class to create packet info object.
30 Help process information about the next packet.
31 Set variables to default values.
33 #: Store the index of the packet.
35 #: Store the index of the source packet generator interface of the packet.
37 #: Store the index of the destination packet generator interface
40 #: Store the copy of the former packet.
44 def pump_output(out, queue):
45 for line in iter(out.readline, b''):
49 class VppTestCase(unittest.TestCase):
50 """This subclass is a base class for VPP test cases that are implemented as
51 classes. It provides methods to create and run test case.
55 def packet_infos(self):
56 """List of packet infos"""
57 return self._packet_infos
60 def packet_infos(self, value):
61 self._packet_infos = value
65 """Return the instance of this testcase"""
66 return cls.test_instance
69 def set_debug_flags(cls, d):
70 cls.debug_core = False
72 cls.debug_gdbserver = False
77 if resource.getrlimit(resource.RLIMIT_CORE)[0] <= 0:
78 # give a heads up if this is actually useless
79 cls.logger.critical("WARNING: core size limit is set 0, core "
80 "files will NOT be created")
84 elif dl == "gdbserver":
85 cls.debug_gdbserver = True
87 raise Exception("Unrecognized DEBUG option: '%s'" % d)
90 def setUpConstants(cls):
91 """ Set-up the test case class based on environment variables """
94 cls.step = True if s.lower() in ("y", "yes", "1") else False
98 d = os.getenv("DEBUG")
101 cls.set_debug_flags(d)
102 cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
103 cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
105 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
106 debug_cli = "cli-listen localhost:5002"
107 cls.vpp_cmdline = [cls.vpp_bin, "unix", "{", "nodaemon", debug_cli, "}",
108 "api-segment", "{", "prefix", cls.shm_prefix, "}"]
109 if cls.plugin_path is not None:
110 cls.vpp_cmdline.extend(["plugin_path", cls.plugin_path])
111 cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
114 def wait_for_enter(cls):
115 if cls.debug_gdbserver:
116 print(double_line_delim)
117 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
119 print(double_line_delim)
120 print("Spawned VPP with PID: %d" % cls.vpp.pid)
122 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
124 print(single_line_delim)
125 print("You can debug the VPP using e.g.:")
126 if cls.debug_gdbserver:
127 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
128 print("Now is the time to attach a gdb by running the above "
129 "command, set up breakpoints etc. and then resume VPP from "
130 "within gdb by issuing the 'continue' command")
132 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
133 print("Now is the time to attach a gdb by running the above "
134 "command and set up breakpoints etc.")
135 print(single_line_delim)
136 raw_input("Press ENTER to continue running the testcase...")
140 cmdline = cls.vpp_cmdline
142 if cls.debug_gdbserver:
143 gdbserver = '/usr/bin/gdbserver'
144 if not os.path.isfile(gdbserver) or \
145 not os.access(gdbserver, os.X_OK):
146 raise Exception("gdbserver binary '%s' does not exist or is "
147 "not executable" % gdbserver)
149 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
150 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
153 cls.vpp = subprocess.Popen(cmdline,
154 stdout=subprocess.PIPE,
155 stderr=subprocess.PIPE,
157 except Exception as e:
158 cls.logger.critical("Couldn't start vpp: %s" % e)
166 Perform class setup before running the testcase
167 Remove shared memory files, start vpp and connect the vpp-api
169 cls.logger = getLogger(cls.__name__)
170 cls.tempdir = tempfile.mkdtemp(
171 prefix='vpp-unittest-' + cls.__name__ + '-')
172 cls.shm_prefix = cls.tempdir.split("/")[-1]
173 os.chdir(cls.tempdir)
174 cls.logger.info("Temporary dir is %s, shm prefix is %s",
175 cls.tempdir, cls.shm_prefix)
178 cls.packet_infos = {}
180 print(double_line_delim)
181 print(colorize(getdoc(cls).splitlines()[0], YELLOW))
182 print(double_line_delim)
183 # need to catch exceptions here because if we raise, then the cleanup
184 # doesn't get called and we might end with a zombie vpp
188 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix)
190 cls.vapi.register_hook(StepHook(cls))
192 cls.vapi.register_hook(PollHook(cls))
197 if cls.debug_gdbserver:
198 print(colorize("You're running VPP inside gdbserver but "
199 "VPP-API connection failed, did you forget "
200 "to 'continue' VPP from within gdb?", RED))
202 cls.vpp_stdout_queue = Queue()
203 cls.vpp_stdout_reader_thread = Thread(
204 target=pump_output, args=(cls.vpp.stdout, cls.vpp_stdout_queue))
205 cls.vpp_stdout_reader_thread.start()
206 cls.vpp_stderr_queue = Queue()
207 cls.vpp_stderr_reader_thread = Thread(
208 target=pump_output, args=(cls.vpp.stderr, cls.vpp_stderr_queue))
209 cls.vpp_stderr_reader_thread.start()
211 if hasattr(cls, 'vpp'):
219 Disconnect vpp-api, kill vpp and cleanup shared memory files
221 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
223 if cls.vpp.returncode is None:
224 print(double_line_delim)
225 print("VPP or GDB server is still running")
226 print(single_line_delim)
227 raw_input("When done debugging, press ENTER to kill the process"
228 " and finish running the testcase...")
230 if hasattr(cls, 'vpp'):
231 cls.vapi.disconnect()
233 if cls.vpp.returncode is None:
237 if hasattr(cls, 'vpp_stdout_queue'):
238 cls.logger.info(single_line_delim)
239 cls.logger.info('VPP output to stdout while running %s:',
241 cls.logger.info(single_line_delim)
242 f = open(cls.tempdir + '/vpp_stdout.txt', 'w')
243 while not cls.vpp_stdout_queue.empty():
244 line = cls.vpp_stdout_queue.get_nowait()
246 cls.logger.info('VPP stdout: %s' % line.rstrip('\n'))
248 if hasattr(cls, 'vpp_stderr_queue'):
249 cls.logger.info(single_line_delim)
250 cls.logger.info('VPP output to stderr while running %s:',
252 cls.logger.info(single_line_delim)
253 f = open(cls.tempdir + '/vpp_stderr.txt', 'w')
254 while not cls.vpp_stderr_queue.empty():
255 line = cls.vpp_stderr_queue.get_nowait()
257 cls.logger.info('VPP stderr: %s' % line.rstrip('\n'))
258 cls.logger.info(single_line_delim)
261 def tearDownClass(cls):
262 """ Perform final cleanup after running all tests in this test-case """
266 """ Show various debug prints after each test """
267 if not self.vpp_dead:
268 self.logger.info(self.vapi.ppcli("show int"))
269 self.logger.debug(self.vapi.cli("show trace"))
270 self.logger.info(self.vapi.ppcli("show hardware"))
271 self.logger.info(self.vapi.ppcli("show error"))
272 self.logger.info(self.vapi.ppcli("show run"))
275 """ Clear trace before running each test"""
276 self.vapi.cli("clear trace")
277 # store the test instance inside the test class - so that objects
278 # holding the class can access instance methods (like assertEqual)
279 type(self).test_instance = self
282 def pg_enable_capture(cls, interfaces):
284 Enable capture on packet-generator interfaces
286 :param interfaces: iterable interface indexes
295 Enable the packet-generator and send all prepared packet streams
296 Remove the packet streams afterwards
298 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
299 cls.vapi.cli('packet-generator enable')
300 sleep(1) # give VPP some time to process the packets
301 for stream in cls.pg_streams:
302 cls.vapi.cli('packet-generator delete %s' % stream)
306 def create_pg_interfaces(cls, interfaces):
308 Create packet-generator interfaces
310 :param interfaces: iterable indexes of the interfaces
315 intf = VppPGInterface(cls, i)
316 setattr(cls, intf.name, intf)
318 cls.pg_interfaces = result
322 def create_loopback_interfaces(cls, interfaces):
324 Create loopback interfaces
326 :param interfaces: iterable indexes of the interfaces
331 intf = VppLoInterface(cls, i)
332 setattr(cls, intf.name, intf)
334 cls.lo_interfaces = result
338 def extend_packet(packet, size):
340 Extend packet to given size by padding with spaces
341 NOTE: Currently works only when Raw layer is present.
343 :param packet: packet
344 :param size: target size
347 packet_len = len(packet) + 4
348 extend = size - packet_len
350 packet[Raw].load += ' ' * extend
352 def add_packet_info_to_list(self, info):
354 Add packet info to the testcase's packet info list
356 :param info: packet info
359 info.index = len(self.packet_infos)
360 self.packet_infos[info.index] = info
362 def create_packet_info(self, src_pg_index, dst_pg_index):
364 Create packet info object containing the source and destination indexes
365 and add it to the testcase's packet info list
367 :param src_pg_index: source packet-generator index
368 :param dst_pg_index: destination packet-generator index
370 :returns: _PacketInfo object
374 self.add_packet_info_to_list(info)
375 info.src = src_pg_index
376 info.dst = dst_pg_index
380 def info_to_payload(info):
382 Convert _PacketInfo object to packet payload
384 :param info: _PacketInfo object
386 :returns: string containing serialized data from packet info
388 return "%d %d %d" % (info.index, info.src, info.dst)
391 def payload_to_info(payload):
393 Convert packet payload to _PacketInfo object
395 :param payload: packet payload
397 :returns: _PacketInfo object containing de-serialized data from payload
400 numbers = payload.split()
402 info.index = int(numbers[0])
403 info.src = int(numbers[1])
404 info.dst = int(numbers[2])
407 def get_next_packet_info(self, info):
409 Iterate over the packet info list stored in the testcase
410 Start iteration with first element if info is None
411 Continue based on index in info if info is specified
413 :param info: info or None
414 :returns: next info in list or None if no more infos
419 next_index = info.index + 1
420 if next_index == len(self.packet_infos):
423 return self.packet_infos[next_index]
425 def get_next_packet_info_for_interface(self, src_index, info):
427 Search the packet info list for the next packet info with same source
430 :param src_index: source interface index to search for
431 :param info: packet info - where to start the search
432 :returns: packet info or None
436 info = self.get_next_packet_info(info)
439 if info.src == src_index:
442 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
444 Search the packet info list for the next packet info with same source
445 and destination interface indexes
447 :param src_index: source interface index to search for
448 :param dst_index: destination interface index to search for
449 :param info: packet info - where to start the search
450 :returns: packet info or None
454 info = self.get_next_packet_info_for_interface(src_index, info)
457 if info.dst == dst_index:
461 class VppTestResult(unittest.TestResult):
463 @property result_string
464 String variable to store the test case result string.
466 List variable containing 2-tuples of TestCase instances and strings
467 holding formatted tracebacks. Each tuple represents a test which
468 raised an unexpected exception.
470 List variable containing 2-tuples of TestCase instances and strings
471 holding formatted tracebacks. Each tuple represents a test where
472 a failure was explicitly signalled using the TestCase.assert*()
476 def __init__(self, stream, descriptions, verbosity):
478 :param stream File descriptor to store where to report test results. Set
479 to the standard error stream by default.
480 :param descriptions Boolean variable to store information if to use test
482 :param verbosity Integer variable to store required verbosity level.
484 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
486 self.descriptions = descriptions
487 self.verbosity = verbosity
488 self.result_string = None
490 def addSuccess(self, test):
492 Record a test succeeded result
497 unittest.TestResult.addSuccess(self, test)
498 self.result_string = colorize("OK", GREEN)
500 def addSkip(self, test, reason):
502 Record a test skipped.
508 unittest.TestResult.addSkip(self, test, reason)
509 self.result_string = colorize("SKIP", YELLOW)
511 def addFailure(self, test, err):
513 Record a test failed result
516 :param err: error message
519 unittest.TestResult.addFailure(self, test, err)
520 if hasattr(test, 'tempdir'):
521 self.result_string = colorize("FAIL", RED) + \
522 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
524 self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
526 def addError(self, test, err):
528 Record a test error result
531 :param err: error message
534 unittest.TestResult.addError(self, test, err)
535 if hasattr(test, 'tempdir'):
536 self.result_string = colorize("ERROR", RED) + \
537 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
539 self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
541 def getDescription(self, test):
546 :returns: test description
549 # TODO: if none print warning not raise exception
550 short_description = test.shortDescription()
551 if self.descriptions and short_description:
552 return short_description
556 def startTest(self, test):
563 unittest.TestResult.startTest(self, test)
564 if self.verbosity > 0:
566 "Starting " + self.getDescription(test) + " ...")
567 self.stream.writeln(single_line_delim)
569 def stopTest(self, test):
576 unittest.TestResult.stopTest(self, test)
577 if self.verbosity > 0:
578 self.stream.writeln(single_line_delim)
579 self.stream.writeln("%-60s%s" %
580 (self.getDescription(test), self.result_string))
581 self.stream.writeln(single_line_delim)
583 self.stream.writeln("%-60s%s" %
584 (self.getDescription(test), self.result_string))
586 def printErrors(self):
588 Print errors from running the test case
590 self.stream.writeln()
591 self.printErrorList('ERROR', self.errors)
592 self.printErrorList('FAIL', self.failures)
594 def printErrorList(self, flavour, errors):
596 Print error list to the output stream together with error type
597 and test case description.
599 :param flavour: error type
600 :param errors: iterable errors
603 for test, err in errors:
604 self.stream.writeln(double_line_delim)
605 self.stream.writeln("%s: %s" %
606 (flavour, self.getDescription(test)))
607 self.stream.writeln(single_line_delim)
608 self.stream.writeln("%s" % err)
611 class VppTestRunner(unittest.TextTestRunner):
613 A basic test runner implementation which prints results on standard error.
616 def resultclass(self):
617 """Class maintaining the results of the tests"""
627 print("Running tests using custom test runner") # debug message
628 return super(VppTestRunner, self).run(test)