8 from collections import deque
9 from threading import Thread
10 from inspect import getdoc
11 from hook import StepHook, PollHook
12 from vpp_pg_interface import VppPGInterface
13 from vpp_lo_interface import VppLoInterface
14 from vpp_papi_provider import VppPapiProvider
15 from scapy.packet import Raw
19 Test framework module.
21 The module provides a set of tools for constructing and running tests and
22 representing the results.
26 class _PacketInfo(object):
27 """Private class to create packet info object.
29 Help process information about the next packet.
30 Set variables to default values.
32 #: Store the index of the packet.
34 #: Store the index of the source packet generator interface of the packet.
36 #: Store the index of the destination packet generator interface
39 #: Store the copy of the former packet.
42 def __eq__(self, other):
43 index = self.index == other.index
44 src = self.src == other.src
45 dst = self.dst == other.dst
46 data = self.data == other.data
47 return index and src and dst and data
50 def pump_output(out, deque):
51 for line in iter(out.readline, b''):
55 class VppTestCase(unittest.TestCase):
56 """This subclass is a base class for VPP test cases that are implemented as
57 classes. It provides methods to create and run test case.
61 def packet_infos(self):
62 """List of packet infos"""
63 return self._packet_infos
66 def packet_infos(self, value):
67 self._packet_infos = value
71 """Return the instance of this testcase"""
72 return cls.test_instance
75 def set_debug_flags(cls, d):
76 cls.debug_core = False
78 cls.debug_gdbserver = False
83 if resource.getrlimit(resource.RLIMIT_CORE)[0] <= 0:
84 # give a heads up if this is actually useless
85 cls.logger.critical("WARNING: core size limit is set 0, core "
86 "files will NOT be created")
90 elif dl == "gdbserver":
91 cls.debug_gdbserver = True
93 raise Exception("Unrecognized DEBUG option: '%s'" % d)
96 def setUpConstants(cls):
97 """ Set-up the test case class based on environment variables """
100 cls.step = True if s.lower() in ("y", "yes", "1") else False
104 d = os.getenv("DEBUG")
107 cls.set_debug_flags(d)
108 cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
109 cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
111 if cls.step or cls.debug_gdb or cls.debug_gdbserver:
112 debug_cli = "cli-listen localhost:5002"
113 cls.vpp_cmdline = [cls.vpp_bin, "unix", "{", "nodaemon", debug_cli, "}",
114 "api-segment", "{", "prefix", cls.shm_prefix, "}"]
115 if cls.plugin_path is not None:
116 cls.vpp_cmdline.extend(["plugin_path", cls.plugin_path])
117 cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
120 def wait_for_enter(cls):
121 if cls.debug_gdbserver:
122 print(double_line_delim)
123 print("Spawned GDB server with PID: %d" % cls.vpp.pid)
125 print(double_line_delim)
126 print("Spawned VPP with PID: %d" % cls.vpp.pid)
128 cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
130 print(single_line_delim)
131 print("You can debug the VPP using e.g.:")
132 if cls.debug_gdbserver:
133 print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
134 print("Now is the time to attach a gdb by running the above "
135 "command, set up breakpoints etc. and then resume VPP from "
136 "within gdb by issuing the 'continue' command")
138 print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
139 print("Now is the time to attach a gdb by running the above "
140 "command and set up breakpoints etc.")
141 print(single_line_delim)
142 raw_input("Press ENTER to continue running the testcase...")
146 cmdline = cls.vpp_cmdline
148 if cls.debug_gdbserver:
149 gdbserver = '/usr/bin/gdbserver'
150 if not os.path.isfile(gdbserver) or \
151 not os.access(gdbserver, os.X_OK):
152 raise Exception("gdbserver binary '%s' does not exist or is "
153 "not executable" % gdbserver)
155 cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
156 cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
159 cls.vpp = subprocess.Popen(cmdline,
160 stdout=subprocess.PIPE,
161 stderr=subprocess.PIPE,
163 except Exception as e:
164 cls.logger.critical("Couldn't start vpp: %s" % e)
172 Perform class setup before running the testcase
173 Remove shared memory files, start vpp and connect the vpp-api
175 cls.logger = getLogger(cls.__name__)
176 cls.tempdir = tempfile.mkdtemp(
177 prefix='vpp-unittest-' + cls.__name__ + '-')
178 cls.shm_prefix = cls.tempdir.split("/")[-1]
179 os.chdir(cls.tempdir)
180 cls.logger.info("Temporary dir is %s, shm prefix is %s",
181 cls.tempdir, cls.shm_prefix)
184 cls._zombie_captures = []
185 cls.packet_infos = {}
188 print(double_line_delim)
189 print(colorize(getdoc(cls).splitlines()[0], YELLOW))
190 print(double_line_delim)
191 # need to catch exceptions here because if we raise, then the cleanup
192 # doesn't get called and we might end with a zombie vpp
195 cls.vpp_stdout_deque = deque()
196 cls.vpp_stdout_reader_thread = Thread(target=pump_output, args=(
197 cls.vpp.stdout, cls.vpp_stdout_deque))
198 cls.vpp_stdout_reader_thread.start()
199 cls.vpp_stderr_deque = deque()
200 cls.vpp_stderr_reader_thread = Thread(target=pump_output, args=(
201 cls.vpp.stderr, cls.vpp_stderr_deque))
202 cls.vpp_stderr_reader_thread.start()
203 cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls)
208 cls.vapi.register_hook(hook)
214 if cls.debug_gdbserver:
215 print(colorize("You're running VPP inside gdbserver but "
216 "VPP-API connection failed, did you forget "
217 "to 'continue' VPP from within gdb?", RED))
220 t, v, tb = sys.exc_info()
230 Disconnect vpp-api, kill vpp and cleanup shared memory files
232 if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
234 if cls.vpp.returncode is None:
235 print(double_line_delim)
236 print("VPP or GDB server is still running")
237 print(single_line_delim)
238 raw_input("When done debugging, press ENTER to kill the process"
239 " and finish running the testcase...")
241 if hasattr(cls, 'vpp'):
242 if hasattr(cls, 'vapi'):
243 cls.vapi.disconnect()
245 if cls.vpp.returncode is None:
249 if hasattr(cls, 'vpp_stdout_deque'):
250 cls.logger.info(single_line_delim)
251 cls.logger.info('VPP output to stdout while running %s:',
253 cls.logger.info(single_line_delim)
254 f = open(cls.tempdir + '/vpp_stdout.txt', 'w')
255 vpp_output = "".join(cls.vpp_stdout_deque)
257 cls.logger.info('\n%s', vpp_output)
258 cls.logger.info(single_line_delim)
260 if hasattr(cls, 'vpp_stderr_deque'):
261 cls.logger.info(single_line_delim)
262 cls.logger.info('VPP output to stderr while running %s:',
264 cls.logger.info(single_line_delim)
265 f = open(cls.tempdir + '/vpp_stderr.txt', 'w')
266 vpp_output = "".join(cls.vpp_stderr_deque)
268 cls.logger.info('\n%s', vpp_output)
269 cls.logger.info(single_line_delim)
272 def tearDownClass(cls):
273 """ Perform final cleanup after running all tests in this test-case """
277 """ Show various debug prints after each test """
278 if not self.vpp_dead:
279 self.logger.debug(self.vapi.cli("show trace"))
280 self.logger.info(self.vapi.ppcli("show int"))
281 self.logger.info(self.vapi.ppcli("show hardware"))
282 self.logger.info(self.vapi.ppcli("show error"))
283 self.logger.info(self.vapi.ppcli("show run"))
286 """ Clear trace before running each test"""
288 raise Exception("VPP is dead when setting up the test")
290 self.vpp_stdout_deque.append(
291 "--- test setUp() for %s.%s(%s) starts here ---\n" %
292 (self.__class__.__name__, self._testMethodName,
293 self._testMethodDoc))
294 self.vpp_stderr_deque.append(
295 "--- test setUp() for %s.%s(%s) starts here ---\n" %
296 (self.__class__.__name__, self._testMethodName,
297 self._testMethodDoc))
298 self.vapi.cli("clear trace")
299 # store the test instance inside the test class - so that objects
300 # holding the class can access instance methods (like assertEqual)
301 type(self).test_instance = self
304 def pg_enable_capture(cls, interfaces):
306 Enable capture on packet-generator interfaces
308 :param interfaces: iterable interface indexes
315 def register_capture(cls, cap_name):
316 """ Register a capture in the testclass """
317 # add to the list of captures with current timestamp
318 cls._captures.append((time.time(), cap_name))
319 # filter out from zombies
320 cls._zombie_captures = [(stamp, name)
321 for (stamp, name) in cls._zombie_captures
326 """ Remove any zombie captures and enable the packet generator """
327 # how long before capture is allowed to be deleted - otherwise vpp
328 # crashes - 100ms seems enough (this shouldn't be needed at all)
331 for stamp, cap_name in cls._zombie_captures:
332 wait = stamp + capture_ttl - now
334 cls.logger.debug("Waiting for %ss before deleting capture %s",
338 cls.logger.debug("Removing zombie capture %s" % cap_name)
339 cls.vapi.cli('packet-generator delete %s' % cap_name)
341 cls.vapi.cli("trace add pg-input 50") # 50 is maximum
342 cls.vapi.cli('packet-generator enable')
343 cls._zombie_captures = cls._captures
347 def create_pg_interfaces(cls, interfaces):
349 Create packet-generator interfaces
351 :param interfaces: iterable indexes of the interfaces
356 intf = VppPGInterface(cls, i)
357 setattr(cls, intf.name, intf)
359 cls.pg_interfaces = result
363 def create_loopback_interfaces(cls, interfaces):
365 Create loopback interfaces
367 :param interfaces: iterable indexes of the interfaces
372 intf = VppLoInterface(cls, i)
373 setattr(cls, intf.name, intf)
375 cls.lo_interfaces = result
379 def extend_packet(packet, size):
381 Extend packet to given size by padding with spaces
382 NOTE: Currently works only when Raw layer is present.
384 :param packet: packet
385 :param size: target size
388 packet_len = len(packet) + 4
389 extend = size - packet_len
391 packet[Raw].load += ' ' * extend
393 def add_packet_info_to_list(self, info):
395 Add packet info to the testcase's packet info list
397 :param info: packet info
400 info.index = len(self.packet_infos)
401 self.packet_infos[info.index] = info
403 def create_packet_info(self, src_pg_index, dst_pg_index):
405 Create packet info object containing the source and destination indexes
406 and add it to the testcase's packet info list
408 :param src_pg_index: source packet-generator index
409 :param dst_pg_index: destination packet-generator index
411 :returns: _PacketInfo object
415 self.add_packet_info_to_list(info)
416 info.src = src_pg_index
417 info.dst = dst_pg_index
421 def info_to_payload(info):
423 Convert _PacketInfo object to packet payload
425 :param info: _PacketInfo object
427 :returns: string containing serialized data from packet info
429 return "%d %d %d" % (info.index, info.src, info.dst)
432 def payload_to_info(payload):
434 Convert packet payload to _PacketInfo object
436 :param payload: packet payload
438 :returns: _PacketInfo object containing de-serialized data from payload
441 numbers = payload.split()
443 info.index = int(numbers[0])
444 info.src = int(numbers[1])
445 info.dst = int(numbers[2])
448 def get_next_packet_info(self, info):
450 Iterate over the packet info list stored in the testcase
451 Start iteration with first element if info is None
452 Continue based on index in info if info is specified
454 :param info: info or None
455 :returns: next info in list or None if no more infos
460 next_index = info.index + 1
461 if next_index == len(self.packet_infos):
464 return self.packet_infos[next_index]
466 def get_next_packet_info_for_interface(self, src_index, info):
468 Search the packet info list for the next packet info with same source
471 :param src_index: source interface index to search for
472 :param info: packet info - where to start the search
473 :returns: packet info or None
477 info = self.get_next_packet_info(info)
480 if info.src == src_index:
483 def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
485 Search the packet info list for the next packet info with same source
486 and destination interface indexes
488 :param src_index: source interface index to search for
489 :param dst_index: destination interface index to search for
490 :param info: packet info - where to start the search
491 :returns: packet info or None
495 info = self.get_next_packet_info_for_interface(src_index, info)
498 if info.dst == dst_index:
501 def assert_equal(self, real_value, expected_value, name_or_class=None):
502 if name_or_class is None:
503 self.assertEqual(real_value, expected_value, msg)
506 msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
507 msg = msg % (getdoc(name_or_class).strip(),
508 real_value, str(name_or_class(real_value)),
509 expected_value, str(name_or_class(expected_value)))
511 msg = "Invalid %s: %s does not match expected value %s" % (
512 name_or_class, real_value, expected_value)
514 self.assertEqual(real_value, expected_value, msg)
525 msg = "Invalid %s: %s out of range <%s,%s>" % (
526 name, real_value, expected_min, expected_max)
527 self.assertTrue(expected_min <= real_value <= expected_max, msg)
530 class VppTestResult(unittest.TestResult):
532 @property result_string
533 String variable to store the test case result string.
535 List variable containing 2-tuples of TestCase instances and strings
536 holding formatted tracebacks. Each tuple represents a test which
537 raised an unexpected exception.
539 List variable containing 2-tuples of TestCase instances and strings
540 holding formatted tracebacks. Each tuple represents a test where
541 a failure was explicitly signalled using the TestCase.assert*()
545 def __init__(self, stream, descriptions, verbosity):
547 :param stream File descriptor to store where to report test results. Set
548 to the standard error stream by default.
549 :param descriptions Boolean variable to store information if to use test
551 :param verbosity Integer variable to store required verbosity level.
553 unittest.TestResult.__init__(self, stream, descriptions, verbosity)
555 self.descriptions = descriptions
556 self.verbosity = verbosity
557 self.result_string = None
559 def addSuccess(self, test):
561 Record a test succeeded result
566 unittest.TestResult.addSuccess(self, test)
567 self.result_string = colorize("OK", GREEN)
569 def addSkip(self, test, reason):
571 Record a test skipped.
577 unittest.TestResult.addSkip(self, test, reason)
578 self.result_string = colorize("SKIP", YELLOW)
580 def addFailure(self, test, err):
582 Record a test failed result
585 :param err: error message
588 unittest.TestResult.addFailure(self, test, err)
589 if hasattr(test, 'tempdir'):
590 self.result_string = colorize("FAIL", RED) + \
591 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
593 self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
595 def addError(self, test, err):
597 Record a test error result
600 :param err: error message
603 unittest.TestResult.addError(self, test, err)
604 if hasattr(test, 'tempdir'):
605 self.result_string = colorize("ERROR", RED) + \
606 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
608 self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
610 def getDescription(self, test):
615 :returns: test description
618 # TODO: if none print warning not raise exception
619 short_description = test.shortDescription()
620 if self.descriptions and short_description:
621 return short_description
625 def startTest(self, test):
632 unittest.TestResult.startTest(self, test)
633 if self.verbosity > 0:
635 "Starting " + self.getDescription(test) + " ...")
636 self.stream.writeln(single_line_delim)
638 def stopTest(self, test):
645 unittest.TestResult.stopTest(self, test)
646 if self.verbosity > 0:
647 self.stream.writeln(single_line_delim)
648 self.stream.writeln("%-60s%s" %
649 (self.getDescription(test), self.result_string))
650 self.stream.writeln(single_line_delim)
652 self.stream.writeln("%-60s%s" %
653 (self.getDescription(test), self.result_string))
655 def printErrors(self):
657 Print errors from running the test case
659 self.stream.writeln()
660 self.printErrorList('ERROR', self.errors)
661 self.printErrorList('FAIL', self.failures)
663 def printErrorList(self, flavour, errors):
665 Print error list to the output stream together with error type
666 and test case description.
668 :param flavour: error type
669 :param errors: iterable errors
672 for test, err in errors:
673 self.stream.writeln(double_line_delim)
674 self.stream.writeln("%s: %s" %
675 (flavour, self.getDescription(test)))
676 self.stream.writeln(single_line_delim)
677 self.stream.writeln("%s" % err)
680 class VppTestRunner(unittest.TextTestRunner):
682 A basic test runner implementation which prints results on standard error.
685 def resultclass(self):
686 """Class maintaining the results of the tests"""
696 print("Running tests using custom test runner") # debug message
697 return super(VppTestRunner, self).run(test)