9 from Queue import Queue
10 from threading import Thread
11 from inspect import getdoc
12 from hook import StepHook, PollHook
13 from vpp_pg_interface import VppPGInterface
14 from vpp_lo_interface import VppLoInterface
15 from vpp_papi_provider import VppPapiProvider
16 from scapy.packet import Raw
20 Test framework module.
22 The module provides a set of tools for constructing and running tests and
23 representing the results.
27 class _PacketInfo(object):
28 """Private class to create packet info object.
30 Help process information about the next packet.
31 Set variables to default values.
33 #: Store the index of the packet.
35 #: Store the index of the source packet generator interface of the packet.
37 #: Store the index of the destination packet generator interface
40 #: Store the copy of the former packet.
44 def pump_output(out, queue):
45 for line in iter(out.readline, b''):
49 class VppTestCase(unittest.TestCase):
50 """This subclass is a base class for VPP test cases that are implemented as
51 classes. It provides methods to create and run test case.
55 def packet_infos(self):
56 """List of packet infos"""
57 return self._packet_infos
60 def packet_infos(self, value):
61 self._packet_infos = value
65 """Return the instance of this testcase"""
66 return cls.test_instance
69 def set_debug_flags(cls, d):
70 cls.debug_core = False
72 cls.debug_gdbserver = False
77 if resource.getrlimit(resource.RLIMIT_CORE)[0] <= 0:
78 # give a heads up if this is actually useless
79 cls.logger.critical("WARNING: core size limit is set 0, core "
80 "files will NOT be created")
84 elif dl == "gdbserver":
85 cls.debug_gdbserver = True
87 raise Exception("Unrecognized DEBUG option: '%s'" % d)
90 def setUpConstants(cls):
91 """ Set-up the test case class based on environment variables """
94 cls.step = True if s.lower() in ("y", "yes", "1") else False
98 d = os.getenv("DEBUG")
101 cls.set_debug_flags(d)
102 cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
103 cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
105 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
106 debug_cli = "cli-listen localhost:5002"
107 cls.vpp_cmdline = [cls.vpp_bin, "unix", "{", "nodaemon", debug_cli, "}",
108 "api-segment", "{", "prefix", cls.shm_prefix, "}"]
109 if cls.plugin_path is not None:
110 cls.vpp_cmdline.extend(["plugin_path", cls.plugin_path])
111 cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
114 def wait_for_enter(cls):
115 if cls.debug_gdbserver:
116 print(double_line_delim)
117 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
119 print(double_line_delim)
120 print("Spawned VPP with PID: %d" % cls.vpp.pid)
122 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
124 print(single_line_delim)
125 print("You can debug the VPP using e.g.:")
126 if cls.debug_gdbserver:
127 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
128 print("Now is the time to attach a gdb by running the above "
129 "command, set up breakpoints etc. and then resume VPP from "
130 "within gdb by issuing the 'continue' command")
132 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
133 print("Now is the time to attach a gdb by running the above "
134 "command and set up breakpoints etc.")
135 print(single_line_delim)
136 raw_input("Press ENTER to continue running the testcase...")
140 cmdline = cls.vpp_cmdline
142 if cls.debug_gdbserver:
143 gdbserver = '/usr/bin/gdbserver'
144 if not os.path.isfile(gdbserver) or \
145 not os.access(gdbserver, os.X_OK):
146 raise Exception("gdbserver binary '%s' does not exist or is "
147 "not executable" % gdbserver)
149 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
150 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
153 cls.vpp = subprocess.Popen(cmdline,
154 stdout=subprocess.PIPE,
155 stderr=subprocess.PIPE,
157 except Exception as e:
158 cls.logger.critical("Couldn't start vpp: %s" % e)
166 Perform class setup before running the testcase
167 Remove shared memory files, start vpp and connect the vpp-api
169 cls.logger = getLogger(cls.__name__)
170 cls.tempdir = tempfile.mkdtemp(
171 prefix='vpp-unittest-' + cls.__name__ + '-')
172 cls.shm_prefix = cls.tempdir.split("/")[-1]
173 os.chdir(cls.tempdir)
174 cls.logger.info("Temporary dir is %s, shm prefix is %s",
175 cls.tempdir, cls.shm_prefix)
178 cls.packet_infos = {}
181 print(double_line_delim)
182 print(colorize(getdoc(cls).splitlines()[0], YELLOW))
183 print(double_line_delim)
184 # need to catch exceptions here because if we raise, then the cleanup
185 # doesn't get called and we might end with a zombie vpp
188 cls.vpp_stdout_queue = Queue()
189 cls.vpp_stdout_reader_thread = Thread(
190 target=pump_output, args=(cls.vpp.stdout, cls.vpp_stdout_queue))
191 cls.vpp_stdout_reader_thread.start()
192 cls.vpp_stderr_queue = Queue()
193 cls.vpp_stderr_reader_thread = Thread(
194 target=pump_output, args=(cls.vpp.stderr, cls.vpp_stderr_queue))
195 cls.vpp_stderr_reader_thread.start()
196 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix)
201 cls.vapi.register_hook(hook)
207 if cls.debug_gdbserver:
208 print(colorize("You're running VPP inside gdbserver but "
209 "VPP-API connection failed, did you forget "
210 "to 'continue' VPP from within gdb?", RED))
222 Disconnect vpp-api, kill vpp and cleanup shared memory files
224 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
226 if cls.vpp.returncode is None:
227 print(double_line_delim)
228 print("VPP or GDB server is still running")
229 print(single_line_delim)
230 raw_input("When done debugging, press ENTER to kill the process"
231 " and finish running the testcase...")
233 if hasattr(cls, 'vpp'):
234 cls.vapi.disconnect()
236 if cls.vpp.returncode is None:
240 if hasattr(cls, 'vpp_stdout_queue'):
241 cls.logger.info(single_line_delim)
242 cls.logger.info('VPP output to stdout while running %s:',
244 cls.logger.info(single_line_delim)
245 f = open(cls.tempdir + '/vpp_stdout.txt', 'w')
246 while not cls.vpp_stdout_queue.empty():
247 line = cls.vpp_stdout_queue.get_nowait()
249 cls.logger.info('VPP stdout: %s' % line.rstrip('\n'))
251 if hasattr(cls, 'vpp_stderr_queue'):
252 cls.logger.info(single_line_delim)
253 cls.logger.info('VPP output to stderr while running %s:',
255 cls.logger.info(single_line_delim)
256 f = open(cls.tempdir + '/vpp_stderr.txt', 'w')
257 while not cls.vpp_stderr_queue.empty():
258 line = cls.vpp_stderr_queue.get_nowait()
260 cls.logger.info('VPP stderr: %s' % line.rstrip('\n'))
261 cls.logger.info(single_line_delim)
264 def tearDownClass(cls):
265 """ Perform final cleanup after running all tests in this test-case """
269 """ Show various debug prints after each test """
270 if not self.vpp_dead:
271 self.logger.debug(self.vapi.cli("show trace"))
272 self.logger.info(self.vapi.ppcli("show int"))
273 self.logger.info(self.vapi.ppcli("show hardware"))
274 self.logger.info(self.vapi.ppcli("show error"))
275 self.logger.info(self.vapi.ppcli("show run"))
278 """ Clear trace before running each test"""
279 self.vapi.cli("clear trace")
280 # store the test instance inside the test class - so that objects
281 # holding the class can access instance methods (like assertEqual)
282 type(self).test_instance = self
285 def pg_enable_capture(cls, interfaces):
287 Enable capture on packet-generator interfaces
289 :param interfaces: iterable interface indexes
298 Enable the packet-generator and send all prepared packet streams
299 Remove the packet streams afterwards
301 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
302 cls.vapi.cli('packet-generator enable')
303 sleep(1) # give VPP some time to process the packets
304 for stream in cls.pg_streams:
305 cls.vapi.cli('packet-generator delete %s' % stream)
309 def create_pg_interfaces(cls, interfaces):
311 Create packet-generator interfaces
313 :param interfaces: iterable indexes of the interfaces
318 intf = VppPGInterface(cls, i)
319 setattr(cls, intf.name, intf)
321 cls.pg_interfaces = result
325 def create_loopback_interfaces(cls, interfaces):
327 Create loopback interfaces
329 :param interfaces: iterable indexes of the interfaces
334 intf = VppLoInterface(cls, i)
335 setattr(cls, intf.name, intf)
337 cls.lo_interfaces = result
341 def extend_packet(packet, size):
343 Extend packet to given size by padding with spaces
344 NOTE: Currently works only when Raw layer is present.
346 :param packet: packet
347 :param size: target size
350 packet_len = len(packet) + 4
351 extend = size - packet_len
353 packet[Raw].load += ' ' * extend
355 def add_packet_info_to_list(self, info):
357 Add packet info to the testcase's packet info list
359 :param info: packet info
362 info.index = len(self.packet_infos)
363 self.packet_infos[info.index] = info
365 def create_packet_info(self, src_pg_index, dst_pg_index):
367 Create packet info object containing the source and destination indexes
368 and add it to the testcase's packet info list
370 :param src_pg_index: source packet-generator index
371 :param dst_pg_index: destination packet-generator index
373 :returns: _PacketInfo object
377 self.add_packet_info_to_list(info)
378 info.src = src_pg_index
379 info.dst = dst_pg_index
383 def info_to_payload(info):
385 Convert _PacketInfo object to packet payload
387 :param info: _PacketInfo object
389 :returns: string containing serialized data from packet info
391 return "%d %d %d" % (info.index, info.src, info.dst)
394 def payload_to_info(payload):
396 Convert packet payload to _PacketInfo object
398 :param payload: packet payload
400 :returns: _PacketInfo object containing de-serialized data from payload
403 numbers = payload.split()
405 info.index = int(numbers[0])
406 info.src = int(numbers[1])
407 info.dst = int(numbers[2])
410 def get_next_packet_info(self, info):
412 Iterate over the packet info list stored in the testcase
413 Start iteration with first element if info is None
414 Continue based on index in info if info is specified
416 :param info: info or None
417 :returns: next info in list or None if no more infos
422 next_index = info.index + 1
423 if next_index == len(self.packet_infos):
426 return self.packet_infos[next_index]
428 def get_next_packet_info_for_interface(self, src_index, info):
430 Search the packet info list for the next packet info with same source
433 :param src_index: source interface index to search for
434 :param info: packet info - where to start the search
435 :returns: packet info or None
439 info = self.get_next_packet_info(info)
442 if info.src == src_index:
445 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
447 Search the packet info list for the next packet info with same source
448 and destination interface indexes
450 :param src_index: source interface index to search for
451 :param dst_index: destination interface index to search for
452 :param info: packet info - where to start the search
453 :returns: packet info or None
457 info = self.get_next_packet_info_for_interface(src_index, info)
460 if info.dst == dst_index:
464 class VppTestResult(unittest.TestResult):
466 @property result_string
467 String variable to store the test case result string.
469 List variable containing 2-tuples of TestCase instances and strings
470 holding formatted tracebacks. Each tuple represents a test which
471 raised an unexpected exception.
473 List variable containing 2-tuples of TestCase instances and strings
474 holding formatted tracebacks. Each tuple represents a test where
475 a failure was explicitly signalled using the TestCase.assert*()
479 def __init__(self, stream, descriptions, verbosity):
481 :param stream File descriptor to store where to report test results. Set
482 to the standard error stream by default.
483 :param descriptions Boolean variable to store information if to use test
485 :param verbosity Integer variable to store required verbosity level.
487 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
489 self.descriptions = descriptions
490 self.verbosity = verbosity
491 self.result_string = None
493 def addSuccess(self, test):
495 Record a test succeeded result
500 unittest.TestResult.addSuccess(self, test)
501 self.result_string = colorize("OK", GREEN)
503 def addSkip(self, test, reason):
505 Record a test skipped.
511 unittest.TestResult.addSkip(self, test, reason)
512 self.result_string = colorize("SKIP", YELLOW)
514 def addFailure(self, test, err):
516 Record a test failed result
519 :param err: error message
522 unittest.TestResult.addFailure(self, test, err)
523 if hasattr(test, 'tempdir'):
524 self.result_string = colorize("FAIL", RED) + \
525 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
527 self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
529 def addError(self, test, err):
531 Record a test error result
534 :param err: error message
537 unittest.TestResult.addError(self, test, err)
538 if hasattr(test, 'tempdir'):
539 self.result_string = colorize("ERROR", RED) + \
540 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
542 self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
544 def getDescription(self, test):
549 :returns: test description
552 # TODO: if none print warning not raise exception
553 short_description = test.shortDescription()
554 if self.descriptions and short_description:
555 return short_description
559 def startTest(self, test):
566 unittest.TestResult.startTest(self, test)
567 if self.verbosity > 0:
569 "Starting " + self.getDescription(test) + " ...")
570 self.stream.writeln(single_line_delim)
572 def stopTest(self, test):
579 unittest.TestResult.stopTest(self, test)
580 if self.verbosity > 0:
581 self.stream.writeln(single_line_delim)
582 self.stream.writeln("%-60s%s" %
583 (self.getDescription(test), self.result_string))
584 self.stream.writeln(single_line_delim)
586 self.stream.writeln("%-60s%s" %
587 (self.getDescription(test), self.result_string))
589 def printErrors(self):
591 Print errors from running the test case
593 self.stream.writeln()
594 self.printErrorList('ERROR', self.errors)
595 self.printErrorList('FAIL', self.failures)
597 def printErrorList(self, flavour, errors):
599 Print error list to the output stream together with error type
600 and test case description.
602 :param flavour: error type
603 :param errors: iterable errors
606 for test, err in errors:
607 self.stream.writeln(double_line_delim)
608 self.stream.writeln("%s: %s" %
609 (flavour, self.getDescription(test)))
610 self.stream.writeln(single_line_delim)
611 self.stream.writeln("%s" % err)
614 class VppTestRunner(unittest.TextTestRunner):
616 A basic test runner implementation which prints results on standard error.
619 def resultclass(self):
620 """Class maintaining the results of the tests"""
630 print("Running tests using custom test runner") # debug message
631 return super(VppTestRunner, self).run(test)