9 from collections import deque
10 from threading import Thread
11 from inspect import getdoc
12 from hook import StepHook, PollHook
13 from vpp_pg_interface import VppPGInterface
14 from vpp_lo_interface import VppLoInterface
15 from vpp_papi_provider import VppPapiProvider
16 from scapy.packet import Raw
20 Test framework module.
22 The module provides a set of tools for constructing and running tests and
23 representing the results.
27 class _PacketInfo(object):
28 """Private class to create packet info object.
30 Help process information about the next packet.
31 Set variables to default values.
33 #: Store the index of the packet.
35 #: Store the index of the source packet generator interface of the packet.
37 #: Store the index of the destination packet generator interface
40 #: Store the copy of the former packet.
44 def pump_output(out, deque):
45 for line in iter(out.readline, b''):
49 class VppTestCase(unittest.TestCase):
50 """This subclass is a base class for VPP test cases that are implemented as
51 classes. It provides methods to create and run test case.
55 def packet_infos(self):
56 """List of packet infos"""
57 return self._packet_infos
60 def packet_infos(self, value):
61 self._packet_infos = value
65 """Return the instance of this testcase"""
66 return cls.test_instance
69 def set_debug_flags(cls, d):
70 cls.debug_core = False
72 cls.debug_gdbserver = False
77 if resource.getrlimit(resource.RLIMIT_CORE)[0] <= 0:
78 # give a heads up if this is actually useless
79 cls.logger.critical("WARNING: core size limit is set 0, core "
80 "files will NOT be created")
84 elif dl == "gdbserver":
85 cls.debug_gdbserver = True
87 raise Exception("Unrecognized DEBUG option: '%s'" % d)
90 def setUpConstants(cls):
91 """ Set-up the test case class based on environment variables """
94 cls.step = True if s.lower() in ("y", "yes", "1") else False
98 d = os.getenv("DEBUG")
101 cls.set_debug_flags(d)
102 cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
103 cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
105 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
106 debug_cli = "cli-listen localhost:5002"
107 cls.vpp_cmdline = [cls.vpp_bin, "unix", "{", "nodaemon", debug_cli, "}",
108 "api-segment", "{", "prefix", cls.shm_prefix, "}"]
109 if cls.plugin_path is not None:
110 cls.vpp_cmdline.extend(["plugin_path", cls.plugin_path])
111 cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
114 def wait_for_enter(cls):
115 if cls.debug_gdbserver:
116 print(double_line_delim)
117 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
119 print(double_line_delim)
120 print("Spawned VPP with PID: %d" % cls.vpp.pid)
122 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
124 print(single_line_delim)
125 print("You can debug the VPP using e.g.:")
126 if cls.debug_gdbserver:
127 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
128 print("Now is the time to attach a gdb by running the above "
129 "command, set up breakpoints etc. and then resume VPP from "
130 "within gdb by issuing the 'continue' command")
132 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
133 print("Now is the time to attach a gdb by running the above "
134 "command and set up breakpoints etc.")
135 print(single_line_delim)
136 raw_input("Press ENTER to continue running the testcase...")
140 cmdline = cls.vpp_cmdline
142 if cls.debug_gdbserver:
143 gdbserver = '/usr/bin/gdbserver'
144 if not os.path.isfile(gdbserver) or \
145 not os.access(gdbserver, os.X_OK):
146 raise Exception("gdbserver binary '%s' does not exist or is "
147 "not executable" % gdbserver)
149 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
150 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
153 cls.vpp = subprocess.Popen(cmdline,
154 stdout=subprocess.PIPE,
155 stderr=subprocess.PIPE,
157 except Exception as e:
158 cls.logger.critical("Couldn't start vpp: %s" % e)
166 Perform class setup before running the testcase
167 Remove shared memory files, start vpp and connect the vpp-api
169 cls.logger = getLogger(cls.__name__)
170 cls.tempdir = tempfile.mkdtemp(
171 prefix='vpp-unittest-' + cls.__name__ + '-')
172 cls.shm_prefix = cls.tempdir.split("/")[-1]
173 os.chdir(cls.tempdir)
174 cls.logger.info("Temporary dir is %s, shm prefix is %s",
175 cls.tempdir, cls.shm_prefix)
178 cls.packet_infos = {}
181 print(double_line_delim)
182 print(colorize(getdoc(cls).splitlines()[0], YELLOW))
183 print(double_line_delim)
184 # need to catch exceptions here because if we raise, then the cleanup
185 # doesn't get called and we might end with a zombie vpp
188 cls.vpp_stdout_deque = deque()
189 cls.vpp_stdout_reader_thread = Thread(target=pump_output, args=(
190 cls.vpp.stdout, cls.vpp_stdout_deque))
191 cls.vpp_stdout_reader_thread.start()
192 cls.vpp_stderr_deque = deque()
193 cls.vpp_stderr_reader_thread = Thread(target=pump_output, args=(
194 cls.vpp.stderr, cls.vpp_stderr_deque))
195 cls.vpp_stderr_reader_thread.start()
196 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls)
201 cls.vapi.register_hook(hook)
207 if cls.debug_gdbserver:
208 print(colorize("You're running VPP inside gdbserver but "
209 "VPP-API connection failed, did you forget "
210 "to 'continue' VPP from within gdb?", RED))
213 t, v, tb = sys.exc_info()
223 Disconnect vpp-api, kill vpp and cleanup shared memory files
225 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
227 if cls.vpp.returncode is None:
228 print(double_line_delim)
229 print("VPP or GDB server is still running")
230 print(single_line_delim)
231 raw_input("When done debugging, press ENTER to kill the process"
232 " and finish running the testcase...")
234 if hasattr(cls, 'vpp'):
235 if hasattr(cls, 'vapi'):
236 cls.vapi.disconnect()
238 if cls.vpp.returncode is None:
242 if hasattr(cls, 'vpp_stdout_deque'):
243 cls.logger.info(single_line_delim)
244 cls.logger.info('VPP output to stdout while running %s:',
246 cls.logger.info(single_line_delim)
247 f = open(cls.tempdir + '/vpp_stdout.txt', 'w')
248 vpp_output = "".join(cls.vpp_stdout_deque)
250 cls.logger.info('\n%s', vpp_output)
251 cls.logger.info(single_line_delim)
253 if hasattr(cls, 'vpp_stderr_deque'):
254 cls.logger.info(single_line_delim)
255 cls.logger.info('VPP output to stderr while running %s:',
257 cls.logger.info(single_line_delim)
258 f = open(cls.tempdir + '/vpp_stderr.txt', 'w')
259 vpp_output = "".join(cls.vpp_stderr_deque)
261 cls.logger.info('\n%s', vpp_output)
262 cls.logger.info(single_line_delim)
265 def tearDownClass(cls):
266 """ Perform final cleanup after running all tests in this test-case """
270 """ Show various debug prints after each test """
271 if not self.vpp_dead:
272 self.logger.debug(self.vapi.cli("show trace"))
273 self.logger.info(self.vapi.ppcli("show int"))
274 self.logger.info(self.vapi.ppcli("show hardware"))
275 self.logger.info(self.vapi.ppcli("show error"))
276 self.logger.info(self.vapi.ppcli("show run"))
279 """ Clear trace before running each test"""
281 raise Exception("VPP is dead when setting up the test")
283 self.vpp_stdout_deque.append(
284 "--- test setUp() for %s.%s(%s) starts here ---\n" %
285 (self.__class__.__name__, self._testMethodName,
286 self._testMethodDoc))
287 self.vpp_stderr_deque.append(
288 "--- test setUp() for %s.%s(%s) starts here ---\n" %
289 (self.__class__.__name__, self._testMethodName,
290 self._testMethodDoc))
291 self.vapi.cli("clear trace")
292 # store the test instance inside the test class - so that objects
293 # holding the class can access instance methods (like assertEqual)
294 type(self).test_instance = self
297 def pg_enable_capture(cls, interfaces):
299 Enable capture on packet-generator interfaces
301 :param interfaces: iterable interface indexes
308 def pg_start(cls, sleep_time=1):
310 Enable the packet-generator and send all prepared packet streams
311 Remove the packet streams afterwards
313 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
314 cls.vapi.cli('packet-generator enable')
315 sleep(sleep_time) # give VPP some time to process the packets
316 for stream in cls.pg_streams:
317 cls.vapi.cli('packet-generator delete %s' % stream)
321 def create_pg_interfaces(cls, interfaces):
323 Create packet-generator interfaces
325 :param interfaces: iterable indexes of the interfaces
330 intf = VppPGInterface(cls, i)
331 setattr(cls, intf.name, intf)
333 cls.pg_interfaces = result
337 def create_loopback_interfaces(cls, interfaces):
339 Create loopback interfaces
341 :param interfaces: iterable indexes of the interfaces
346 intf = VppLoInterface(cls, i)
347 setattr(cls, intf.name, intf)
349 cls.lo_interfaces = result
353 def extend_packet(packet, size):
355 Extend packet to given size by padding with spaces
356 NOTE: Currently works only when Raw layer is present.
358 :param packet: packet
359 :param size: target size
362 packet_len = len(packet) + 4
363 extend = size - packet_len
365 packet[Raw].load += ' ' * extend
367 def add_packet_info_to_list(self, info):
369 Add packet info to the testcase's packet info list
371 :param info: packet info
374 info.index = len(self.packet_infos)
375 self.packet_infos[info.index] = info
377 def create_packet_info(self, src_pg_index, dst_pg_index):
379 Create packet info object containing the source and destination indexes
380 and add it to the testcase's packet info list
382 :param src_pg_index: source packet-generator index
383 :param dst_pg_index: destination packet-generator index
385 :returns: _PacketInfo object
389 self.add_packet_info_to_list(info)
390 info.src = src_pg_index
391 info.dst = dst_pg_index
395 def info_to_payload(info):
397 Convert _PacketInfo object to packet payload
399 :param info: _PacketInfo object
401 :returns: string containing serialized data from packet info
403 return "%d %d %d" % (info.index, info.src, info.dst)
406 def payload_to_info(payload):
408 Convert packet payload to _PacketInfo object
410 :param payload: packet payload
412 :returns: _PacketInfo object containing de-serialized data from payload
415 numbers = payload.split()
417 info.index = int(numbers[0])
418 info.src = int(numbers[1])
419 info.dst = int(numbers[2])
422 def get_next_packet_info(self, info):
424 Iterate over the packet info list stored in the testcase
425 Start iteration with first element if info is None
426 Continue based on index in info if info is specified
428 :param info: info or None
429 :returns: next info in list or None if no more infos
434 next_index = info.index + 1
435 if next_index == len(self.packet_infos):
438 return self.packet_infos[next_index]
440 def get_next_packet_info_for_interface(self, src_index, info):
442 Search the packet info list for the next packet info with same source
445 :param src_index: source interface index to search for
446 :param info: packet info - where to start the search
447 :returns: packet info or None
451 info = self.get_next_packet_info(info)
454 if info.src == src_index:
457 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
459 Search the packet info list for the next packet info with same source
460 and destination interface indexes
462 :param src_index: source interface index to search for
463 :param dst_index: destination interface index to search for
464 :param info: packet info - where to start the search
465 :returns: packet info or None
469 info = self.get_next_packet_info_for_interface(src_index, info)
472 if info.dst == dst_index:
475 def assert_equal(self, real_value, expected_value, name_or_class=None):
476 if name_or_class is None:
477 self.assertEqual(real_value, expected_value, msg)
480 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
481 msg = msg % (getdoc(name_or_class).strip(),
482 real_value, str(name_or_class(real_value)),
483 expected_value, str(name_or_class(expected_value)))
485 msg = "Invalid %s: %s does not match expected value %s" % (
486 name_or_class, real_value, expected_value)
488 self.assertEqual(real_value, expected_value, msg)
499 msg = "Invalid %s: %s out of range <%s,%s>" % (
500 name, real_value, expected_min, expected_max)
501 self.assertTrue(expected_min <= real_value <= expected_max, msg)
504 class VppTestResult(unittest.TestResult):
506 @property result_string
507 String variable to store the test case result string.
509 List variable containing 2-tuples of TestCase instances and strings
510 holding formatted tracebacks. Each tuple represents a test which
511 raised an unexpected exception.
513 List variable containing 2-tuples of TestCase instances and strings
514 holding formatted tracebacks. Each tuple represents a test where
515 a failure was explicitly signalled using the TestCase.assert*()
519 def __init__(self, stream, descriptions, verbosity):
521 :param stream File descriptor to store where to report test results. Set
522 to the standard error stream by default.
523 :param descriptions Boolean variable to store information if to use test
525 :param verbosity Integer variable to store required verbosity level.
527 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
529 self.descriptions = descriptions
530 self.verbosity = verbosity
531 self.result_string = None
533 def addSuccess(self, test):
535 Record a test succeeded result
540 unittest.TestResult.addSuccess(self, test)
541 self.result_string = colorize("OK", GREEN)
543 def addSkip(self, test, reason):
545 Record a test skipped.
551 unittest.TestResult.addSkip(self, test, reason)
552 self.result_string = colorize("SKIP", YELLOW)
554 def addFailure(self, test, err):
556 Record a test failed result
559 :param err: error message
562 unittest.TestResult.addFailure(self, test, err)
563 if hasattr(test, 'tempdir'):
564 self.result_string = colorize("FAIL", RED) + \
565 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
567 self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
569 def addError(self, test, err):
571 Record a test error result
574 :param err: error message
577 unittest.TestResult.addError(self, test, err)
578 if hasattr(test, 'tempdir'):
579 self.result_string = colorize("ERROR", RED) + \
580 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
582 self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
584 def getDescription(self, test):
589 :returns: test description
592 # TODO: if none print warning not raise exception
593 short_description = test.shortDescription()
594 if self.descriptions and short_description:
595 return short_description
599 def startTest(self, test):
606 unittest.TestResult.startTest(self, test)
607 if self.verbosity > 0:
609 "Starting " + self.getDescription(test) + " ...")
610 self.stream.writeln(single_line_delim)
612 def stopTest(self, test):
619 unittest.TestResult.stopTest(self, test)
620 if self.verbosity > 0:
621 self.stream.writeln(single_line_delim)
622 self.stream.writeln("%-60s%s" %
623 (self.getDescription(test), self.result_string))
624 self.stream.writeln(single_line_delim)
626 self.stream.writeln("%-60s%s" %
627 (self.getDescription(test), self.result_string))
629 def printErrors(self):
631 Print errors from running the test case
633 self.stream.writeln()
634 self.printErrorList('ERROR', self.errors)
635 self.printErrorList('FAIL', self.failures)
637 def printErrorList(self, flavour, errors):
639 Print error list to the output stream together with error type
640 and test case description.
642 :param flavour: error type
643 :param errors: iterable errors
646 for test, err in errors:
647 self.stream.writeln(double_line_delim)
648 self.stream.writeln("%s: %s" %
649 (flavour, self.getDescription(test)))
650 self.stream.writeln(single_line_delim)
651 self.stream.writeln("%s" % err)
654 class VppTestRunner(unittest.TextTestRunner):
656 A basic test runner implementation which prints results on standard error.
659 def resultclass(self):
660 """Class maintaining the results of the tests"""
670 print("Running tests using custom test runner") # debug message
671 return super(VppTestRunner, self).run(test)