ACL plugin rejects ICMP messages (VPP-624)
[vpp.git] / test / framework.py
1 #!/usr/bin/env python
2
3 from __future__ import print_function
4 import gc
5 import sys
6 import os
7 import select
8 import unittest
9 import tempfile
10 import time
11 import resource
12 from collections import deque
13 from threading import Thread, Event
14 from inspect import getdoc
15 from traceback import format_exception
16 from logging import FileHandler, DEBUG, Formatter
17 from scapy.packet import Raw
18 from hook import StepHook, PollHook
19 from vpp_pg_interface import VppPGInterface
20 from vpp_sub_interface import VppSubInterface
21 from vpp_lo_interface import VppLoInterface
22 from vpp_papi_provider import VppPapiProvider
23 from log import *
24 from vpp_object import VppObjectRegistry
25 if os.name == 'posix' and sys.version_info[0] < 3:
26     # using subprocess32 is recommended by python official documentation
27     # @ https://docs.python.org/2/library/subprocess.html
28     import subprocess32 as subprocess
29 else:
30     import subprocess
31
32 """
33   Test framework module.
34
35   The module provides a set of tools for constructing and running tests and
36   representing the results.
37 """
38
39
40 class _PacketInfo(object):
41     """Private class to create packet info object.
42
43     Help process information about the next packet.
44     Set variables to default values.
45     """
46     #: Store the index of the packet.
47     index = -1
48     #: Store the index of the source packet generator interface of the packet.
49     src = -1
50     #: Store the index of the destination packet generator interface
51     #: of the packet.
52     dst = -1
53     #: Store expected ip version
54     ip = -1
55     #: Store expected upper protocol
56     proto = -1
57     #: Store the copy of the former packet.
58     data = None
59
60     def __eq__(self, other):
61         index = self.index == other.index
62         src = self.src == other.src
63         dst = self.dst == other.dst
64         data = self.data == other.data
65         return index and src and dst and data
66
67
68 def pump_output(testclass):
69     """ pump output from vpp stdout/stderr to proper queues """
70     while not testclass.pump_thread_stop_flag.wait(0):
71         readable = select.select([testclass.vpp.stdout.fileno(),
72                                   testclass.vpp.stderr.fileno(),
73                                   testclass.pump_thread_wakeup_pipe[0]],
74                                  [], [])[0]
75         if testclass.vpp.stdout.fileno() in readable:
76             read = os.read(testclass.vpp.stdout.fileno(), 1024)
77             testclass.vpp_stdout_deque.append(read)
78         if testclass.vpp.stderr.fileno() in readable:
79             read = os.read(testclass.vpp.stderr.fileno(), 1024)
80             testclass.vpp_stderr_deque.append(read)
81         # ignoring the dummy pipe here intentionally - the flag will take care
82         # of properly terminating the loop
83
84
85 def running_extended_tests():
86     try:
87         s = os.getenv("EXTENDED_TESTS")
88         return True if s.lower() in ("y", "yes", "1") else False
89     except:
90         return False
91     return False
92
93
94 class VppTestCase(unittest.TestCase):
95     """This subclass is a base class for VPP test cases that are implemented as
96     classes. It provides methods to create and run test case.
97     """
98
99     @property
100     def packet_infos(self):
101         """List of packet infos"""
102         return self._packet_infos
103
104     @classmethod
105     def get_packet_count_for_if_idx(cls, dst_if_index):
106         """Get the number of packet info for specified destination if index"""
107         if dst_if_index in cls._packet_count_for_dst_if_idx:
108             return cls._packet_count_for_dst_if_idx[dst_if_index]
109         else:
110             return 0
111
112     @classmethod
113     def instance(cls):
114         """Return the instance of this testcase"""
115         return cls.test_instance
116
117     @classmethod
118     def set_debug_flags(cls, d):
119         cls.debug_core = False
120         cls.debug_gdb = False
121         cls.debug_gdbserver = False
122         if d is None:
123             return
124         dl = d.lower()
125         if dl == "core":
126             cls.debug_core = True
127         elif dl == "gdb":
128             cls.debug_gdb = True
129         elif dl == "gdbserver":
130             cls.debug_gdbserver = True
131         else:
132             raise Exception("Unrecognized DEBUG option: '%s'" % d)
133
134     @classmethod
135     def setUpConstants(cls):
136         """ Set-up the test case class based on environment variables """
137         try:
138             s = os.getenv("STEP")
139             cls.step = True if s.lower() in ("y", "yes", "1") else False
140         except:
141             cls.step = False
142         try:
143             d = os.getenv("DEBUG")
144         except:
145             d = None
146         cls.set_debug_flags(d)
147         cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
148         cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
149         debug_cli = ""
150         if cls.step or cls.debug_gdb or cls.debug_gdbserver:
151             debug_cli = "cli-listen localhost:5002"
152         coredump_size = None
153         try:
154             size = os.getenv("COREDUMP_SIZE")
155             if size is not None:
156                 coredump_size = "coredump-size %s" % size
157         except:
158             pass
159         if coredump_size is None:
160             coredump_size = "coredump-size unlimited"
161         cls.vpp_cmdline = [cls.vpp_bin, "unix",
162                            "{", "nodaemon", debug_cli, coredump_size, "}",
163                            "api-trace", "{", "on", "}",
164                            "api-segment", "{", "prefix", cls.shm_prefix, "}",
165                            "plugins", "{", "plugin", "dpdk_plugin.so", "{",
166                            "disable", "}", "}"]
167         if cls.plugin_path is not None:
168             cls.vpp_cmdline.extend(["plugin_path", cls.plugin_path])
169         cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
170
171     @classmethod
172     def wait_for_enter(cls):
173         if cls.debug_gdbserver:
174             print(double_line_delim)
175             print("Spawned GDB server with PID: %d" % cls.vpp.pid)
176         elif cls.debug_gdb:
177             print(double_line_delim)
178             print("Spawned VPP with PID: %d" % cls.vpp.pid)
179         else:
180             cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
181             return
182         print(single_line_delim)
183         print("You can debug the VPP using e.g.:")
184         if cls.debug_gdbserver:
185             print("gdb " + cls.vpp_bin + " -ex 'target remote localhost:7777'")
186             print("Now is the time to attach a gdb by running the above "
187                   "command, set up breakpoints etc. and then resume VPP from "
188                   "within gdb by issuing the 'continue' command")
189         elif cls.debug_gdb:
190             print("gdb " + cls.vpp_bin + " -ex 'attach %s'" % cls.vpp.pid)
191             print("Now is the time to attach a gdb by running the above "
192                   "command and set up breakpoints etc.")
193         print(single_line_delim)
194         raw_input("Press ENTER to continue running the testcase...")
195
196     @classmethod
197     def run_vpp(cls):
198         cmdline = cls.vpp_cmdline
199
200         if cls.debug_gdbserver:
201             gdbserver = '/usr/bin/gdbserver'
202             if not os.path.isfile(gdbserver) or \
203                     not os.access(gdbserver, os.X_OK):
204                 raise Exception("gdbserver binary '%s' does not exist or is "
205                                 "not executable" % gdbserver)
206
207             cmdline = [gdbserver, 'localhost:7777'] + cls.vpp_cmdline
208             cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
209
210         try:
211             cls.vpp = subprocess.Popen(cmdline,
212                                        stdout=subprocess.PIPE,
213                                        stderr=subprocess.PIPE,
214                                        bufsize=1)
215         except Exception as e:
216             cls.logger.critical("Couldn't start vpp: %s" % e)
217             raise
218
219         cls.wait_for_enter()
220
221     @classmethod
222     def setUpClass(cls):
223         """
224         Perform class setup before running the testcase
225         Remove shared memory files, start vpp and connect the vpp-api
226         """
227         gc.collect()  # run garbage collection first
228         cls.logger = getLogger(cls.__name__)
229         cls.tempdir = tempfile.mkdtemp(
230             prefix='vpp-unittest-' + cls.__name__ + '-')
231         file_handler = FileHandler("%s/log.txt" % cls.tempdir)
232         file_handler.setFormatter(
233             Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
234                       datefmt="%H:%M:%S"))
235         file_handler.setLevel(DEBUG)
236         cls.logger.addHandler(file_handler)
237         cls.shm_prefix = cls.tempdir.split("/")[-1]
238         os.chdir(cls.tempdir)
239         cls.logger.info("Temporary dir is %s, shm prefix is %s",
240                         cls.tempdir, cls.shm_prefix)
241         cls.setUpConstants()
242         cls.reset_packet_infos()
243         cls._captures = []
244         cls._zombie_captures = []
245         cls.verbose = 0
246         cls.vpp_dead = False
247         cls.registry = VppObjectRegistry()
248         # need to catch exceptions here because if we raise, then the cleanup
249         # doesn't get called and we might end with a zombie vpp
250         try:
251             cls.run_vpp()
252             cls.vpp_stdout_deque = deque()
253             cls.vpp_stderr_deque = deque()
254             cls.pump_thread_stop_flag = Event()
255             cls.pump_thread_wakeup_pipe = os.pipe()
256             cls.pump_thread = Thread(target=pump_output, args=(cls,))
257             cls.pump_thread.daemon = True
258             cls.pump_thread.start()
259             cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls)
260             if cls.step:
261                 hook = StepHook(cls)
262             else:
263                 hook = PollHook(cls)
264             cls.vapi.register_hook(hook)
265             cls.sleep(0.1, "after vpp startup, before initial poll")
266             hook.poll_vpp()
267             try:
268                 cls.vapi.connect()
269             except:
270                 if cls.debug_gdbserver:
271                     print(colorize("You're running VPP inside gdbserver but "
272                                    "VPP-API connection failed, did you forget "
273                                    "to 'continue' VPP from within gdb?", RED))
274                 raise
275         except:
276             t, v, tb = sys.exc_info()
277             try:
278                 cls.quit()
279             except:
280                 pass
281             raise t, v, tb
282
283     @classmethod
284     def quit(cls):
285         """
286         Disconnect vpp-api, kill vpp and cleanup shared memory files
287         """
288         if (cls.debug_gdbserver or cls.debug_gdb) and hasattr(cls, 'vpp'):
289             cls.vpp.poll()
290             if cls.vpp.returncode is None:
291                 print(double_line_delim)
292                 print("VPP or GDB server is still running")
293                 print(single_line_delim)
294                 raw_input("When done debugging, press ENTER to kill the "
295                           "process and finish running the testcase...")
296
297         os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
298         cls.pump_thread_stop_flag.set()
299         if hasattr(cls, 'pump_thread'):
300             cls.logger.debug("Waiting for pump thread to stop")
301             cls.pump_thread.join()
302         if hasattr(cls, 'vpp_stderr_reader_thread'):
303             cls.logger.debug("Waiting for stdderr pump to stop")
304             cls.vpp_stderr_reader_thread.join()
305
306         if hasattr(cls, 'vpp'):
307             if hasattr(cls, 'vapi'):
308                 cls.vapi.disconnect()
309                 del cls.vapi
310             cls.vpp.poll()
311             if cls.vpp.returncode is None:
312                 cls.logger.debug("Sending TERM to vpp")
313                 cls.vpp.terminate()
314                 cls.logger.debug("Waiting for vpp to die")
315                 cls.vpp.communicate()
316             del cls.vpp
317
318         if hasattr(cls, 'vpp_stdout_deque'):
319             cls.logger.info(single_line_delim)
320             cls.logger.info('VPP output to stdout while running %s:',
321                             cls.__name__)
322             cls.logger.info(single_line_delim)
323             f = open(cls.tempdir + '/vpp_stdout.txt', 'w')
324             vpp_output = "".join(cls.vpp_stdout_deque)
325             f.write(vpp_output)
326             cls.logger.info('\n%s', vpp_output)
327             cls.logger.info(single_line_delim)
328
329         if hasattr(cls, 'vpp_stderr_deque'):
330             cls.logger.info(single_line_delim)
331             cls.logger.info('VPP output to stderr while running %s:',
332                             cls.__name__)
333             cls.logger.info(single_line_delim)
334             f = open(cls.tempdir + '/vpp_stderr.txt', 'w')
335             vpp_output = "".join(cls.vpp_stderr_deque)
336             f.write(vpp_output)
337             cls.logger.info('\n%s', vpp_output)
338             cls.logger.info(single_line_delim)
339
340     @classmethod
341     def tearDownClass(cls):
342         """ Perform final cleanup after running all tests in this test-case """
343         cls.quit()
344
345     def tearDown(self):
346         """ Show various debug prints after each test """
347         self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
348                           (self.__class__.__name__, self._testMethodName,
349                            self._testMethodDoc))
350         if not self.vpp_dead:
351             self.logger.debug(self.vapi.cli("show trace"))
352             self.logger.info(self.vapi.ppcli("show int"))
353             self.logger.info(self.vapi.ppcli("show hardware"))
354             self.logger.info(self.vapi.ppcli("show error"))
355             self.logger.info(self.vapi.ppcli("show run"))
356             self.registry.remove_vpp_config(self.logger)
357             # Save/Dump VPP api trace log
358             api_trace = "vpp_api_trace.%s.log" % self._testMethodName
359             tmp_api_trace = "/tmp/%s" % api_trace
360             vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
361             self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
362             self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
363                                                     vpp_api_trace_log))
364             os.rename(tmp_api_trace, vpp_api_trace_log)
365             self.logger.info(self.vapi.ppcli("api trace dump %s" %
366                                              vpp_api_trace_log))
367         else:
368             self.registry.unregister_all(self.logger)
369
370     def setUp(self):
371         """ Clear trace before running each test"""
372         self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
373                           (self.__class__.__name__, self._testMethodName,
374                            self._testMethodDoc))
375         if self.vpp_dead:
376             raise Exception("VPP is dead when setting up the test")
377         self.sleep(.1, "during setUp")
378         self.vpp_stdout_deque.append(
379             "--- test setUp() for %s.%s(%s) starts here ---\n" %
380             (self.__class__.__name__, self._testMethodName,
381              self._testMethodDoc))
382         self.vpp_stderr_deque.append(
383             "--- test setUp() for %s.%s(%s) starts here ---\n" %
384             (self.__class__.__name__, self._testMethodName,
385              self._testMethodDoc))
386         self.vapi.cli("clear trace")
387         # store the test instance inside the test class - so that objects
388         # holding the class can access instance methods (like assertEqual)
389         type(self).test_instance = self
390
391     @classmethod
392     def pg_enable_capture(cls, interfaces):
393         """
394         Enable capture on packet-generator interfaces
395
396         :param interfaces: iterable interface indexes
397
398         """
399         for i in interfaces:
400             i.enable_capture()
401
402     @classmethod
403     def register_capture(cls, cap_name):
404         """ Register a capture in the testclass """
405         # add to the list of captures with current timestamp
406         cls._captures.append((time.time(), cap_name))
407         # filter out from zombies
408         cls._zombie_captures = [(stamp, name)
409                                 for (stamp, name) in cls._zombie_captures
410                                 if name != cap_name]
411
412     @classmethod
413     def pg_start(cls):
414         """ Remove any zombie captures and enable the packet generator """
415         # how long before capture is allowed to be deleted - otherwise vpp
416         # crashes - 100ms seems enough (this shouldn't be needed at all)
417         capture_ttl = 0.1
418         now = time.time()
419         for stamp, cap_name in cls._zombie_captures:
420             wait = stamp + capture_ttl - now
421             if wait > 0:
422                 cls.sleep(wait, "before deleting capture %s" % cap_name)
423                 now = time.time()
424             cls.logger.debug("Removing zombie capture %s" % cap_name)
425             cls.vapi.cli('packet-generator delete %s' % cap_name)
426
427         cls.vapi.cli("trace add pg-input 50")  # 50 is maximum
428         cls.vapi.cli('packet-generator enable')
429         cls._zombie_captures = cls._captures
430         cls._captures = []
431
432     @classmethod
433     def create_pg_interfaces(cls, interfaces):
434         """
435         Create packet-generator interfaces.
436
437         :param interfaces: iterable indexes of the interfaces.
438         :returns: List of created interfaces.
439
440         """
441         result = []
442         for i in interfaces:
443             intf = VppPGInterface(cls, i)
444             setattr(cls, intf.name, intf)
445             result.append(intf)
446         cls.pg_interfaces = result
447         return result
448
449     @classmethod
450     def create_loopback_interfaces(cls, interfaces):
451         """
452         Create loopback interfaces.
453
454         :param interfaces: iterable indexes of the interfaces.
455         :returns: List of created interfaces.
456         """
457         result = []
458         for i in interfaces:
459             intf = VppLoInterface(cls, i)
460             setattr(cls, intf.name, intf)
461             result.append(intf)
462         cls.lo_interfaces = result
463         return result
464
465     @staticmethod
466     def extend_packet(packet, size):
467         """
468         Extend packet to given size by padding with spaces
469         NOTE: Currently works only when Raw layer is present.
470
471         :param packet: packet
472         :param size: target size
473
474         """
475         packet_len = len(packet) + 4
476         extend = size - packet_len
477         if extend > 0:
478             packet[Raw].load += ' ' * extend
479
480     @classmethod
481     def reset_packet_infos(cls):
482         """ Reset the list of packet info objects and packet counts to zero """
483         cls._packet_infos = {}
484         cls._packet_count_for_dst_if_idx = {}
485
486     @classmethod
487     def create_packet_info(cls, src_if, dst_if):
488         """
489         Create packet info object containing the source and destination indexes
490         and add it to the testcase's packet info list
491
492         :param VppInterface src_if: source interface
493         :param VppInterface dst_if: destination interface
494
495         :returns: _PacketInfo object
496
497         """
498         info = _PacketInfo()
499         info.index = len(cls._packet_infos)
500         info.src = src_if.sw_if_index
501         info.dst = dst_if.sw_if_index
502         if isinstance(dst_if, VppSubInterface):
503             dst_idx = dst_if.parent.sw_if_index
504         else:
505             dst_idx = dst_if.sw_if_index
506         if dst_idx in cls._packet_count_for_dst_if_idx:
507             cls._packet_count_for_dst_if_idx[dst_idx] += 1
508         else:
509             cls._packet_count_for_dst_if_idx[dst_idx] = 1
510         cls._packet_infos[info.index] = info
511         return info
512
513     @staticmethod
514     def info_to_payload(info):
515         """
516         Convert _PacketInfo object to packet payload
517
518         :param info: _PacketInfo object
519
520         :returns: string containing serialized data from packet info
521         """
522         return "%d %d %d %d %d" % (info.index, info.src, info.dst,
523                                    info.ip, info.proto)
524
525     @staticmethod
526     def payload_to_info(payload):
527         """
528         Convert packet payload to _PacketInfo object
529
530         :param payload: packet payload
531
532         :returns: _PacketInfo object containing de-serialized data from payload
533
534         """
535         numbers = payload.split()
536         info = _PacketInfo()
537         info.index = int(numbers[0])
538         info.src = int(numbers[1])
539         info.dst = int(numbers[2])
540         info.ip = int(numbers[3])
541         info.proto = int(numbers[4])
542         return info
543
544     def get_next_packet_info(self, info):
545         """
546         Iterate over the packet info list stored in the testcase
547         Start iteration with first element if info is None
548         Continue based on index in info if info is specified
549
550         :param info: info or None
551         :returns: next info in list or None if no more infos
552         """
553         if info is None:
554             next_index = 0
555         else:
556             next_index = info.index + 1
557         if next_index == len(self._packet_infos):
558             return None
559         else:
560             return self._packet_infos[next_index]
561
562     def get_next_packet_info_for_interface(self, src_index, info):
563         """
564         Search the packet info list for the next packet info with same source
565         interface index
566
567         :param src_index: source interface index to search for
568         :param info: packet info - where to start the search
569         :returns: packet info or None
570
571         """
572         while True:
573             info = self.get_next_packet_info(info)
574             if info is None:
575                 return None
576             if info.src == src_index:
577                 return info
578
579     def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
580         """
581         Search the packet info list for the next packet info with same source
582         and destination interface indexes
583
584         :param src_index: source interface index to search for
585         :param dst_index: destination interface index to search for
586         :param info: packet info - where to start the search
587         :returns: packet info or None
588
589         """
590         while True:
591             info = self.get_next_packet_info_for_interface(src_index, info)
592             if info is None:
593                 return None
594             if info.dst == dst_index:
595                 return info
596
597     def assert_equal(self, real_value, expected_value, name_or_class=None):
598         if name_or_class is None:
599             self.assertEqual(real_value, expected_value)
600             return
601         try:
602             msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
603             msg = msg % (getdoc(name_or_class).strip(),
604                          real_value, str(name_or_class(real_value)),
605                          expected_value, str(name_or_class(expected_value)))
606         except:
607             msg = "Invalid %s: %s does not match expected value %s" % (
608                 name_or_class, real_value, expected_value)
609
610         self.assertEqual(real_value, expected_value, msg)
611
612     def assert_in_range(self,
613                         real_value,
614                         expected_min,
615                         expected_max,
616                         name=None):
617         if name is None:
618             msg = None
619         else:
620             msg = "Invalid %s: %s out of range <%s,%s>" % (
621                 name, real_value, expected_min, expected_max)
622         self.assertTrue(expected_min <= real_value <= expected_max, msg)
623
624     @classmethod
625     def sleep(cls, timeout, remark=None):
626         if hasattr(cls, 'logger'):
627             cls.logger.debug("Sleeping for %ss (%s)" % (timeout, remark))
628         time.sleep(timeout)
629
630
631 class TestCasePrinter(object):
632     _shared_state = {}
633
634     def __init__(self):
635         self.__dict__ = self._shared_state
636         if not hasattr(self, "_test_case_set"):
637             self._test_case_set = set()
638
639     def print_test_case_heading_if_first_time(self, case):
640         if case.__class__ not in self._test_case_set:
641             print(double_line_delim)
642             print(colorize(getdoc(case.__class__).splitlines()[0], YELLOW))
643             print(double_line_delim)
644             self._test_case_set.add(case.__class__)
645
646
647 class VppTestResult(unittest.TestResult):
648     """
649     @property result_string
650      String variable to store the test case result string.
651     @property errors
652      List variable containing 2-tuples of TestCase instances and strings
653      holding formatted tracebacks. Each tuple represents a test which
654      raised an unexpected exception.
655     @property failures
656      List variable containing 2-tuples of TestCase instances and strings
657      holding formatted tracebacks. Each tuple represents a test where
658      a failure was explicitly signalled using the TestCase.assert*()
659      methods.
660     """
661
662     def __init__(self, stream, descriptions, verbosity):
663         """
664         :param stream File descriptor to store where to report test results.
665             Set to the standard error stream by default.
666         :param descriptions Boolean variable to store information if to use
667             test case descriptions.
668         :param verbosity Integer variable to store required verbosity level.
669         """
670         unittest.TestResult.__init__(self, stream, descriptions, verbosity)
671         self.stream = stream
672         self.descriptions = descriptions
673         self.verbosity = verbosity
674         self.result_string = None
675         self.printer = TestCasePrinter()
676
677     def addSuccess(self, test):
678         """
679         Record a test succeeded result
680
681         :param test:
682
683         """
684         if hasattr(test, 'logger'):
685             test.logger.debug("--- addSuccess() %s.%s(%s) called"
686                               % (test.__class__.__name__,
687                                  test._testMethodName,
688                                  test._testMethodDoc))
689         unittest.TestResult.addSuccess(self, test)
690         self.result_string = colorize("OK", GREEN)
691
692     def addSkip(self, test, reason):
693         """
694         Record a test skipped.
695
696         :param test:
697         :param reason:
698
699         """
700         if hasattr(test, 'logger'):
701             test.logger.debug("--- addSkip() %s.%s(%s) called, reason is %s"
702                               % (test.__class__.__name__,
703                                  test._testMethodName,
704                                  test._testMethodDoc,
705                                  reason))
706         unittest.TestResult.addSkip(self, test, reason)
707         self.result_string = colorize("SKIP", YELLOW)
708
709     def addFailure(self, test, err):
710         """
711         Record a test failed result
712
713         :param test:
714         :param err: error message
715
716         """
717         if hasattr(test, 'logger'):
718             test.logger.debug("--- addFailure() %s.%s(%s) called, err is %s"
719                               % (test.__class__.__name__,
720                                  test._testMethodName,
721                                  test._testMethodDoc, err))
722             test.logger.debug("formatted exception is:\n%s" %
723                               "".join(format_exception(*err)))
724         unittest.TestResult.addFailure(self, test, err)
725         if hasattr(test, 'tempdir'):
726             self.result_string = colorize("FAIL", RED) + \
727                 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
728         else:
729             self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
730
731     def addError(self, test, err):
732         """
733         Record a test error result
734
735         :param test:
736         :param err: error message
737
738         """
739         if hasattr(test, 'logger'):
740             test.logger.debug("--- addError() %s.%s(%s) called, err is %s"
741                               % (test.__class__.__name__,
742                                  test._testMethodName,
743                                  test._testMethodDoc, err))
744             test.logger.debug("formatted exception is:\n%s" %
745                               "".join(format_exception(*err)))
746         unittest.TestResult.addError(self, test, err)
747         if hasattr(test, 'tempdir'):
748             self.result_string = colorize("ERROR", RED) + \
749                 ' [ temp dir used by test case: ' + test.tempdir + ' ]'
750         else:
751             self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
752
753     def getDescription(self, test):
754         """
755         Get test description
756
757         :param test:
758         :returns: test description
759
760         """
761         # TODO: if none print warning not raise exception
762         short_description = test.shortDescription()
763         if self.descriptions and short_description:
764             return short_description
765         else:
766             return str(test)
767
768     def startTest(self, test):
769         """
770         Start a test
771
772         :param test:
773
774         """
775         self.printer.print_test_case_heading_if_first_time(test)
776         unittest.TestResult.startTest(self, test)
777         if self.verbosity > 0:
778             self.stream.writeln(
779                 "Starting " + self.getDescription(test) + " ...")
780             self.stream.writeln(single_line_delim)
781
782     def stopTest(self, test):
783         """
784         Stop a test
785
786         :param test:
787
788         """
789         unittest.TestResult.stopTest(self, test)
790         if self.verbosity > 0:
791             self.stream.writeln(single_line_delim)
792             self.stream.writeln("%-73s%s" % (self.getDescription(test),
793                                              self.result_string))
794             self.stream.writeln(single_line_delim)
795         else:
796             self.stream.writeln("%-73s%s" % (self.getDescription(test),
797                                              self.result_string))
798
799     def printErrors(self):
800         """
801         Print errors from running the test case
802         """
803         self.stream.writeln()
804         self.printErrorList('ERROR', self.errors)
805         self.printErrorList('FAIL', self.failures)
806
807     def printErrorList(self, flavour, errors):
808         """
809         Print error list to the output stream together with error type
810         and test case description.
811
812         :param flavour: error type
813         :param errors: iterable errors
814
815         """
816         for test, err in errors:
817             self.stream.writeln(double_line_delim)
818             self.stream.writeln("%s: %s" %
819                                 (flavour, self.getDescription(test)))
820             self.stream.writeln(single_line_delim)
821             self.stream.writeln("%s" % err)
822
823
824 class VppTestRunner(unittest.TextTestRunner):
825     """
826     A basic test runner implementation which prints results to standard error.
827     """
828     @property
829     def resultclass(self):
830         """Class maintaining the results of the tests"""
831         return VppTestResult
832
833     def __init__(self, stream=sys.stderr, descriptions=True, verbosity=1,
834                  failfast=False, buffer=False, resultclass=None):
835         # ignore stream setting here, use hard-coded stdout to be in sync
836         # with prints from VppTestCase methods ...
837         super(VppTestRunner, self).__init__(sys.stdout, descriptions,
838                                             verbosity, failfast, buffer,
839                                             resultclass)
840
841     test_option = "TEST"
842
843     def parse_test_option(self):
844         try:
845             f = os.getenv(self.test_option)
846         except:
847             f = None
848         filter_file_name = None
849         filter_class_name = None
850         filter_func_name = None
851         if f:
852             if '.' in f:
853                 parts = f.split('.')
854                 if len(parts) > 3:
855                     raise Exception("Unrecognized %s option: %s" %
856                                     (self.test_option, f))
857                 if len(parts) > 2:
858                     if parts[2] not in ('*', ''):
859                         filter_func_name = parts[2]
860                 if parts[1] not in ('*', ''):
861                     filter_class_name = parts[1]
862                 if parts[0] not in ('*', ''):
863                     if parts[0].startswith('test_'):
864                         filter_file_name = parts[0]
865                     else:
866                         filter_file_name = 'test_%s' % parts[0]
867             else:
868                 if f.startswith('test_'):
869                     filter_file_name = f
870                 else:
871                     filter_file_name = 'test_%s' % f
872         return filter_file_name, filter_class_name, filter_func_name
873
874     def filter_tests(self, tests, filter_file, filter_class, filter_func):
875         result = unittest.suite.TestSuite()
876         for t in tests:
877             if isinstance(t, unittest.suite.TestSuite):
878                 # this is a bunch of tests, recursively filter...
879                 x = self.filter_tests(t, filter_file, filter_class,
880                                       filter_func)
881                 if x.countTestCases() > 0:
882                     result.addTest(x)
883             elif isinstance(t, unittest.TestCase):
884                 # this is a single test
885                 parts = t.id().split('.')
886                 # t.id() for common cases like this:
887                 # test_classifier.TestClassifier.test_acl_ip
888                 # apply filtering only if it is so
889                 if len(parts) == 3:
890                     if filter_file and filter_file != parts[0]:
891                         continue
892                     if filter_class and filter_class != parts[1]:
893                         continue
894                     if filter_func and filter_func != parts[2]:
895                         continue
896                 result.addTest(t)
897             else:
898                 # unexpected object, don't touch it
899                 result.addTest(t)
900         return result
901
902     def run(self, test):
903         """
904         Run the tests
905
906         :param test:
907
908         """
909         gc.disable()  # disable garbage collection, we'll do that manually
910         print("Running tests using custom test runner")  # debug message
911         filter_file, filter_class, filter_func = self.parse_test_option()
912         print("Active filters: file=%s, class=%s, function=%s" % (
913             filter_file, filter_class, filter_func))
914         filtered = self.filter_tests(test, filter_file, filter_class,
915                                      filter_func)
916         print("%s out of %s tests match specified filters" % (
917             filtered.countTestCases(), test.countTestCases()))
918         return super(VppTestRunner, self).run(filtered)