VPP-1508: Python3 tests. Explicitly specify string formatting.
[vpp.git] / test / framework.py
index 8a1bfcb..dc7d610 100644 (file)
@@ -32,6 +32,7 @@ from util import ppp, is_core_present
 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
 from scapy.layers.inet6 import ICMPv6EchoReply
 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
 from scapy.layers.inet6 import ICMPv6EchoReply
+
 if os.name == 'posix' and sys.version_info[0] < 3:
     # using subprocess32 is recommended by python official documentation
     # @ https://docs.python.org/2/library/subprocess.html
 if os.name == 'posix' and sys.version_info[0] < 3:
     # using subprocess32 is recommended by python official documentation
     # @ https://docs.python.org/2/library/subprocess.html
@@ -39,6 +40,11 @@ if os.name == 'posix' and sys.version_info[0] < 3:
 else:
     import subprocess
 
 else:
     import subprocess
 
+#  Python2/3 compatible
+try:
+    input = raw_input
+except NameError:
+    pass
 
 PASS = 0
 FAIL = 1
 
 PASS = 0
 FAIL = 1
@@ -46,13 +52,11 @@ ERROR = 2
 SKIP = 3
 TEST_RUN = 4
 
 SKIP = 3
 TEST_RUN = 4
 
-
 debug_framework = False
 if os.getenv('TEST_DEBUG', "0") == "1":
     debug_framework = True
     import debug_internal
 
 debug_framework = False
 if os.getenv('TEST_DEBUG', "0") == "1":
     debug_framework = True
     import debug_internal
 
-
 """
   Test framework module.
 
 """
   Test framework module.
 
@@ -120,7 +124,7 @@ def pump_output(testclass):
                 split = read.splitlines(True)
                 if len(stderr_fragment) > 0:
                     split[0] = "%s%s" % (stderr_fragment, split[0])
                 split = read.splitlines(True)
                 if len(stderr_fragment) > 0:
                     split[0] = "%s%s" % (stderr_fragment, split[0])
-                if len(split) > 0 and split[-1].endswith("\n"):
+                if len(split) > 0 and split[-1].endswith(b"\n"):
                     limit = None
                 else:
                     limit = -1
                     limit = None
                 else:
                     limit = -1
@@ -130,27 +134,35 @@ def pump_output(testclass):
                     for line in split[:limit]:
                         testclass.logger.debug(
                             "VPP STDERR: %s" % line.rstrip("\n"))
                     for line in split[:limit]:
                         testclass.logger.debug(
                             "VPP STDERR: %s" % line.rstrip("\n"))
-        # ignoring the dummy pipe here intentionally - the flag will take care
-        # of properly terminating the loop
+                        # ignoring the dummy pipe here intentionally - the
+                        # flag will take care of properly terminating the loop
 
 
 
 
-def is_skip_aarch64_set():
+def _is_skip_aarch64_set():
     return os.getenv('SKIP_AARCH64', 'n').lower() in ('yes', 'y', '1')
 
     return os.getenv('SKIP_AARCH64', 'n').lower() in ('yes', 'y', '1')
 
+is_skip_aarch64_set = _is_skip_aarch64_set()
+
 
 
-def is_platform_aarch64():
+def _is_platform_aarch64():
     return platform.machine() == 'aarch64'
 
     return platform.machine() == 'aarch64'
 
+is_platform_aarch64 = _is_platform_aarch64()
 
 
-def running_extended_tests():
+
+def _running_extended_tests():
     s = os.getenv("EXTENDED_TESTS", "n")
     return True if s.lower() in ("y", "yes", "1") else False
 
     s = os.getenv("EXTENDED_TESTS", "n")
     return True if s.lower() in ("y", "yes", "1") else False
 
+running_extended_tests = _running_extended_tests()
+
 
 
-def running_on_centos():
+def _running_on_centos():
     os_id = os.getenv("OS_ID", "")
     return True if "centos" in os_id.lower() else False
 
     os_id = os.getenv("OS_ID", "")
     return True if "centos" in os_id.lower() else False
 
+running_on_centos = _running_on_centos
+
 
 class KeepAliveReporter(object):
     """
 
 class KeepAliveReporter(object):
     """
@@ -194,6 +206,7 @@ class VppTestCase(unittest.TestCase):
     """
 
     extra_vpp_punt_config = []
     """
 
     extra_vpp_punt_config = []
+    extra_vpp_plugin_config = []
 
     @property
     def packet_infos(self):
 
     @property
     def packet_infos(self):
@@ -259,14 +272,6 @@ class VppTestCase(unittest.TestCase):
 
         return random.choice(tuple(min_usage_set))
 
 
         return random.choice(tuple(min_usage_set))
 
-    @classmethod
-    def print_header(cls):
-        if not hasattr(cls, '_header_printed'):
-            print(double_line_delim)
-            print(colorize(getdoc(cls).splitlines()[0], GREEN))
-            print(double_line_delim)
-            cls._header_printed = True
-
     @classmethod
     def setUpConstants(cls):
         """ Set-up the test case class based on environment variables """
     @classmethod
     def setUpConstants(cls):
         """ Set-up the test case class based on environment variables """
@@ -309,7 +314,7 @@ class VppTestCase(unittest.TestCase):
                            "{", "socket-name", cls.stats_sock, "}", "plugins",
                            "{", "plugin", "dpdk_plugin.so", "{", "disable",
                            "}", "plugin", "unittest_plugin.so", "{", "enable",
                            "{", "socket-name", cls.stats_sock, "}", "plugins",
                            "{", "plugin", "dpdk_plugin.so", "{", "disable",
                            "}", "plugin", "unittest_plugin.so", "{", "enable",
-                           "}""}", ]
+                           "}"] + cls.extra_vpp_plugin_config + ["}", ]
         if cls.extra_vpp_punt_config is not None:
             cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
         if plugin_path is not None:
         if cls.extra_vpp_punt_config is not None:
             cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
         if plugin_path is not None:
@@ -340,7 +345,7 @@ class VppTestCase(unittest.TestCase):
             print("Now is the time to attach a gdb by running the above "
                   "command and set up breakpoints etc.")
         print(single_line_delim)
             print("Now is the time to attach a gdb by running the above "
                   "command and set up breakpoints etc.")
         print(single_line_delim)
-        raw_input("Press ENTER to continue running the testcase...")
+        input("Press ENTER to continue running the testcase...")
 
     @classmethod
     def run_vpp(cls):
 
     @classmethod
     def run_vpp(cls):
@@ -362,7 +367,16 @@ class VppTestCase(unittest.TestCase):
                                        stderr=subprocess.PIPE,
                                        bufsize=1)
         except subprocess.CalledProcessError as e:
                                        stderr=subprocess.PIPE,
                                        bufsize=1)
         except subprocess.CalledProcessError as e:
-            cls.logger.critical("Couldn't start vpp: %s" % e)
+            cls.logger.critical("Subprocess returned with non-0 return code: ("
+                                "%s)", e.returncode)
+            raise
+        except OSError as e:
+            cls.logger.critical("Subprocess returned with OS error: "
+                                "(%s) %s", e.errno, e.strerror)
+            raise
+        except Exception as e:
+            cls.logger.exception("Subprocess returned unexpected from "
+                                 "%s:", cmdline)
             raise
 
         cls.wait_for_enter()
             raise
 
         cls.wait_for_enter()
@@ -386,9 +400,9 @@ class VppTestCase(unittest.TestCase):
         Perform class setup before running the testcase
         Remove shared memory files, start vpp and connect the vpp-api
         """
         Perform class setup before running the testcase
         Remove shared memory files, start vpp and connect the vpp-api
         """
+        super(VppTestCase, cls).setUpClass()
         gc.collect()  # run garbage collection first
         random.seed()
         gc.collect()  # run garbage collection first
         random.seed()
-        cls.print_header()
         cls.logger = get_logger(cls.__name__)
         if hasattr(cls, 'parallel_handler'):
             cls.logger.addHandler(cls.parallel_handler)
         cls.logger = get_logger(cls.__name__)
         if hasattr(cls, 'parallel_handler'):
             cls.logger.addHandler(cls.parallel_handler)
@@ -480,14 +494,14 @@ class VppTestCase(unittest.TestCase):
                 print(double_line_delim)
                 print("VPP or GDB server is still running")
                 print(single_line_delim)
                 print(double_line_delim)
                 print("VPP or GDB server is still running")
                 print(single_line_delim)
-                raw_input("When done debugging, press ENTER to kill the "
-                          "process and finish running the testcase...")
+                input("When done debugging, press ENTER to kill the "
+                      "process and finish running the testcase...")
 
         # first signal that we want to stop the pump thread, then wake it up
         if hasattr(cls, 'pump_thread_stop_flag'):
             cls.pump_thread_stop_flag.set()
         if hasattr(cls, 'pump_thread_wakeup_pipe'):
 
         # first signal that we want to stop the pump thread, then wake it up
         if hasattr(cls, 'pump_thread_stop_flag'):
             cls.pump_thread_stop_flag.set()
         if hasattr(cls, 'pump_thread_wakeup_pipe'):
-            os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
+            os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
         if hasattr(cls, 'pump_thread'):
             cls.logger.debug("Waiting for pump thread to stop")
             cls.pump_thread.join()
         if hasattr(cls, 'pump_thread'):
             cls.logger.debug("Waiting for pump thread to stop")
             cls.pump_thread.join()
@@ -546,11 +560,12 @@ class VppTestCase(unittest.TestCase):
 
     def tearDown(self):
         """ Show various debug prints after each test """
 
     def tearDown(self):
         """ Show various debug prints after each test """
+        super(VppTestCase, self).tearDown()
         self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
                           (self.__class__.__name__, self._testMethodName,
                            self._testMethodDoc))
         if not self.vpp_dead:
         self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
                           (self.__class__.__name__, self._testMethodName,
                            self._testMethodDoc))
         if not self.vpp_dead:
-            self.logger.debug(self.vapi.cli("show trace"))
+            self.logger.debug(self.vapi.cli("show trace max 1000"))
             self.logger.info(self.vapi.ppcli("show interface"))
             self.logger.info(self.vapi.ppcli("show hardware"))
             self.logger.info(self.statistics.set_errors_str())
             self.logger.info(self.vapi.ppcli("show interface"))
             self.logger.info(self.vapi.ppcli("show hardware"))
             self.logger.info(self.statistics.set_errors_str())
@@ -572,6 +587,7 @@ class VppTestCase(unittest.TestCase):
 
     def setUp(self):
         """ Clear trace before running each test"""
 
     def setUp(self):
         """ Clear trace before running each test"""
+        super(VppTestCase, self).setUp()
         self.reporter.send_keep_alive(self)
         self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
                           (self.__class__.__name__, self._testMethodName,
         self.reporter.send_keep_alive(self)
         self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
                           (self.__class__.__name__, self._testMethodName,
@@ -631,7 +647,7 @@ class VppTestCase(unittest.TestCase):
             cls.logger.debug("Removing zombie capture %s" % cap_name)
             cls.vapi.cli('packet-generator delete %s' % cap_name)
 
             cls.logger.debug("Removing zombie capture %s" % cap_name)
             cls.vapi.cli('packet-generator delete %s' % cap_name)
 
-        cls.vapi.cli("trace add pg-input 50")  # 50 is maximum
+        cls.vapi.cli("trace add pg-input 1000")
         cls.vapi.cli('packet-generator enable')
         cls._zombie_captures = cls._captures
         cls._captures = []
         cls.vapi.cli('packet-generator enable')
         cls._zombie_captures = cls._captures
         cls._captures = []
@@ -844,8 +860,8 @@ class VppTestCase(unittest.TestCase):
                 for cf in checksum_fields:
                     if hasattr(layer, cf):
                         if ignore_zero_udp_checksums and \
                 for cf in checksum_fields:
                     if hasattr(layer, cf):
                         if ignore_zero_udp_checksums and \
-                                0 == getattr(layer, cf) and \
-                                layer.name in udp_layers:
+                                        0 == getattr(layer, cf) and \
+                                        layer.name in udp_layers:
                             continue
                         delattr(layer, cf)
                         checksums.append((counter, cf))
                             continue
                         delattr(layer, cf)
                         checksums.append((counter, cf))
@@ -918,37 +934,43 @@ class VppTestCase(unittest.TestCase):
             self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
 
     def assert_packet_counter_equal(self, counter, expected_value):
             self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
 
     def assert_packet_counter_equal(self, counter, expected_value):
-        counters = self.vapi.cli("sh errors").split('\n')
-        counter_value = -1
-        for i in range(1, len(counters)-1):
-            results = counters[i].split()
-            if results[1] == counter:
-                counter_value = int(results[0])
-                break
-        self.assert_equal(counter_value, expected_value,
-                          "packet counter `%s'" % counter)
+        if counter.startswith("/"):
+            counter_value = self.statistics.get_counter(counter)
+            self.assert_equal(counter_value, expected_value,
+                              "packet counter `%s'" % counter)
+        else:
+            counters = self.vapi.cli("sh errors").split('\n')
+            counter_value = -1
+            for i in range(1, len(counters) - 1):
+                results = counters[i].split()
+                if results[1] == counter:
+                    counter_value = int(results[0])
+                    break
 
     @classmethod
     def sleep(cls, timeout, remark=None):
         if hasattr(cls, 'logger'):
 
     @classmethod
     def sleep(cls, timeout, remark=None):
         if hasattr(cls, 'logger'):
-            cls.logger.debug("Starting sleep for %ss (%s)" % (timeout, remark))
+            cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
         before = time.time()
         time.sleep(timeout)
         after = time.time()
         before = time.time()
         time.sleep(timeout)
         after = time.time()
-        if after - before > 2 * timeout:
+        if hasattr(cls, 'logger') and after - before > 2 * timeout:
             cls.logger.error("unexpected time.sleep() result - "
             cls.logger.error("unexpected time.sleep() result - "
-                             "slept for %ss instead of ~%ss!" % (
-                                 after - before, timeout))
+                             "slept for %es instead of ~%es!",
+                             after - before, timeout)
         if hasattr(cls, 'logger'):
             cls.logger.debug(
         if hasattr(cls, 'logger'):
             cls.logger.debug(
-                "Finished sleep (%s) - slept %ss (wanted %ss)" % (
-                    remark, after - before, timeout))
+                "Finished sleep (%s) - slept %es (wanted %es)",
+                remark, after - before, timeout)
 
 
-    def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
+    def pg_send(self, intf, pkts):
         self.vapi.cli("clear trace")
         intf.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
         self.vapi.cli("clear trace")
         intf.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
+
+    def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None):
+        self.pg_send(intf, pkts)
         if not timeout:
             timeout = 1
         for i in self.pg_interfaces:
         if not timeout:
             timeout = 1
         for i in self.pg_interfaces:
@@ -956,32 +978,15 @@ class VppTestCase(unittest.TestCase):
             i.assert_nothing_captured(remark=remark)
             timeout = 0.1
 
             i.assert_nothing_captured(remark=remark)
             timeout = 0.1
 
-    def send_and_expect(self, input, pkts, output):
-        self.vapi.cli("clear trace")
-        input.add_stream(pkts)
-        self.pg_enable_capture(self.pg_interfaces)
-        self.pg_start()
-        if isinstance(object, (list,)):
-            rx = []
-            for o in output:
-                rx.append(output.get_capture(len(pkts)))
-        else:
-            rx = output.get_capture(len(pkts))
+    def send_and_expect(self, intf, pkts, output):
+        self.pg_send(intf, pkts)
+        rx = output.get_capture(len(pkts))
         return rx
 
         return rx
 
-    def send_and_expect_only(self, input, pkts, output, timeout=None):
-        self.vapi.cli("clear trace")
-        input.add_stream(pkts)
-        self.pg_enable_capture(self.pg_interfaces)
-        self.pg_start()
-        if isinstance(object, (list,)):
-            outputs = output
-            rx = []
-            for o in outputs:
-                rx.append(output.get_capture(len(pkts)))
-        else:
-            rx = output.get_capture(len(pkts))
-            outputs = [output]
+    def send_and_expect_only(self, intf, pkts, output, timeout=None):
+        self.pg_send(intf, pkts)
+        rx = output.get_capture(len(pkts))
+        outputs = [output]
         if not timeout:
             timeout = 1
         for i in self.pg_interfaces:
         if not timeout:
             timeout = 1
         for i in self.pg_interfaces:
@@ -992,6 +997,11 @@ class VppTestCase(unittest.TestCase):
 
         return rx
 
 
         return rx
 
+    def runTest(self):
+        """ unittest calls runTest when TestCase is instantiated without a
+        test case.  Use case: Writing unittests against VppTestCase"""
+        pass
+
 
 def get_testcase_doc_name(test):
     return getdoc(test.__class__).splitlines()[0]
 
 def get_testcase_doc_name(test):
     return getdoc(test.__class__).splitlines()[0]
@@ -1033,7 +1043,8 @@ class VppTestResult(unittest.TestResult):
     core_crash_test_cases_info = set()
     current_test_case_info = None
 
     core_crash_test_cases_info = set()
     current_test_case_info = None
 
-    def __init__(self, stream, descriptions, verbosity, runner):
+    def __init__(self, stream=None, descriptions=None, verbosity=None,
+                 runner=None):
         """
         :param stream File descriptor to store where to report test results.
             Set to the standard error stream by default.
         """
         :param stream File descriptor to store where to report test results.
             Set to the standard error stream by default.
@@ -1041,7 +1052,7 @@ class VppTestResult(unittest.TestResult):
             test case descriptions.
         :param verbosity Integer variable to store required verbosity level.
         """
             test case descriptions.
         :param verbosity Integer variable to store required verbosity level.
         """
-        unittest.TestResult.__init__(self, stream, descriptions, verbosity)
+        super(VppTestResult, self).__init__(stream, descriptions, verbosity)
         self.stream = stream
         self.descriptions = descriptions
         self.verbosity = verbosity
         self.stream = stream
         self.descriptions = descriptions
         self.verbosity = verbosity
@@ -1152,7 +1163,7 @@ class VppTestResult(unittest.TestResult):
                     if isinstance(test, unittest.suite._ErrorHolder):
                         test_name = str(test)
                     else:
                     if isinstance(test, unittest.suite._ErrorHolder):
                         test_name = str(test)
                     else:
-                        test_name = "'{}' ({})".format(
+                        test_name = "'{!s}' ({!s})".format(
                             get_testcase_doc_name(test), test.id())
                     self.current_test_case_info.core_crash_test = test_name
                 self.core_crash_test_cases_info.add(
                             get_testcase_doc_name(test), test.id())
                     self.current_test_case_info.core_crash_test = test_name
                 self.core_crash_test_cases_info.add(
@@ -1199,7 +1210,15 @@ class VppTestResult(unittest.TestResult):
         :param test:
 
         """
         :param test:
 
         """
-        test.print_header()
+
+        def print_header(test):
+            if not hasattr(test.__class__, '_header_printed'):
+                print(double_line_delim)
+                print(colorize(getdoc(test).splitlines()[0], GREEN))
+                print(double_line_delim)
+            test.__class__._header_printed = True
+
+        print_header(test)
 
         unittest.TestResult.startTest(self, test)
         if self.verbosity > 0:
 
         unittest.TestResult.startTest(self, test)
         if self.verbosity > 0:
@@ -1262,6 +1281,7 @@ class VppTestRunner(unittest.TextTestRunner):
     """
     A basic test runner implementation which prints results to standard error.
     """
     """
     A basic test runner implementation which prints results to standard error.
     """
+
     @property
     def resultclass(self):
         """Class maintaining the results of the tests"""
     @property
     def resultclass(self):
         """Class maintaining the results of the tests"""
@@ -1269,13 +1289,12 @@ class VppTestRunner(unittest.TextTestRunner):
 
     def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
                  result_pipe=None, failfast=False, buffer=False,
 
     def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
                  result_pipe=None, failfast=False, buffer=False,
-                 resultclass=None, print_summary=True):
-
+                 resultclass=None, print_summary=True, **kwargs):
         # ignore stream setting here, use hard-coded stdout to be in sync
         # with prints from VppTestCase methods ...
         super(VppTestRunner, self).__init__(sys.stdout, descriptions,
                                             verbosity, failfast, buffer,
         # ignore stream setting here, use hard-coded stdout to be in sync
         # with prints from VppTestCase methods ...
         super(VppTestRunner, self).__init__(sys.stdout, descriptions,
                                             verbosity, failfast, buffer,
-                                            resultclass)
+                                            resultclass, **kwargs)
         KeepAliveReporter.pipe = keep_alive_pipe
 
         self.orig_stream = self.stream
         KeepAliveReporter.pipe = keep_alive_pipe
 
         self.orig_stream = self.stream
@@ -1335,3 +1354,6 @@ class Worker(Thread):
         self.logger.info(err)
         self.logger.info(single_line_delim)
         self.result = self.process.returncode
         self.logger.info(err)
         self.logger.info(single_line_delim)
         self.result = self.process.returncode
+
+if __name__ == '__main__':
+    pass