New upstream version 17.11.4
[deb_dpdk.git] / test / test / autotest_runner.py
index fc882ec..c6a9105 100644 (file)
@@ -31,6 +31,7 @@
 
 # The main logic behind running autotests in parallel
 
+from __future__ import print_function
 import StringIO
 import csv
 import multiprocessing
@@ -69,7 +70,7 @@ def wait_prompt(child):
 # quite a bit of effort to make it work).
 
 
-def run_test_group(cmdline, test_group):
+def run_test_group(cmdline, target, test_group):
     results = []
     child = None
     start_time = time.time()
@@ -80,8 +81,8 @@ def run_test_group(cmdline, test_group):
         # prepare logging of init
         startuplog = StringIO.StringIO()
 
-        print >>startuplog, "\n%s %s\n" % ("=" * 20, test_group["Prefix"])
-        print >>startuplog, "\ncmdline=%s" % cmdline
+        print("\n%s %s\n" % ("=" * 20, test_group["Prefix"]), file=startuplog)
+        print("\ncmdline=%s" % cmdline, file=startuplog)
 
         child = pexpect.spawn(cmdline, logfile=startuplog)
 
@@ -122,13 +123,6 @@ def run_test_group(cmdline, test_group):
     results.append((0, "Success", "Start %s" % test_group["Prefix"],
                     time.time() - start_time, startuplog.getvalue(), None))
 
-    # parse the binary for available test commands
-    binary = cmdline.split()[0]
-    stripped = 'not stripped' not in subprocess.check_output(['file', binary])
-    if not stripped:
-        symbols = subprocess.check_output(['nm', binary]).decode('utf-8')
-        avail_cmds = re.findall('test_register_(\w+)', symbols)
-
     # run all tests in test group
     for test in test_group["Tests"]:
 
@@ -145,25 +139,23 @@ def run_test_group(cmdline, test_group):
 
         try:
             # print test name to log buffer
-            print >>logfile, "\n%s %s\n" % ("-" * 20, test["Name"])
+            print("\n%s %s\n" % ("-" * 20, test["Name"]), file=logfile)
 
             # run test function associated with the test
-            if stripped or test["Command"] in avail_cmds:
-                result = test["Func"](child, test["Command"])
-            else:
-                result = (0, "Skipped [Not Available]")
+            result = test["Func"](child, test["Command"])
 
             # make a note when the test was finished
             end_time = time.time()
 
+            log = logfile.getvalue()
+
             # append test data to the result tuple
-            result += (test["Name"], end_time - start_time,
-                       logfile.getvalue())
+            result += (test["Name"], end_time - start_time, log)
 
             # call report function, if any defined, and supply it with
             # target and complete log for test run
             if test["Report"]:
-                report = test["Report"](self.target, log)
+                report = test["Report"](target, log)
 
                 # append report to results tuple
                 result += (report,)
@@ -212,8 +204,10 @@ class AutotestRunner:
     def __init__(self, cmdline, target, blacklist, whitelist):
         self.cmdline = cmdline
         self.target = target
+        self.binary = cmdline.split()[0]
         self.blacklist = blacklist
         self.whitelist = whitelist
+        self.skipped = []
 
         # log file filename
         logfile = "%s.log" % target
@@ -275,7 +269,7 @@ class AutotestRunner:
 
             # don't print out total time every line, it's the same anyway
             if i == len(results) - 1:
-                print(result,
+                print(result +
                       "[%02dm %02ds]" % (total_time / 60, total_time % 60))
             else:
                 print(result)
@@ -302,53 +296,58 @@ class AutotestRunner:
             if i != 0:
                 self.csvwriter.writerow([test_name, test_result, result_str])
 
-    # this function iterates over test groups and removes each
-    # test that is not in whitelist/blacklist
-    def __filter_groups(self, test_groups):
-        groups_to_remove = []
-
-        # filter out tests from parallel test groups
-        for i, test_group in enumerate(test_groups):
-
-            # iterate over a copy so that we could safely delete individual
-            # tests
-            for test in test_group["Tests"][:]:
-                test_id = test["Command"]
-
-                # dump tests are specified in full e.g. "Dump_mempool"
-                if "_autotest" in test_id:
-                    test_id = test_id[:-len("_autotest")]
-
-                # filter out blacklisted/whitelisted tests
-                if self.blacklist and test_id in self.blacklist:
-                    test_group["Tests"].remove(test)
-                    continue
-                if self.whitelist and test_id not in self.whitelist:
-                    test_group["Tests"].remove(test)
-                    continue
-
-            # modify or remove original group
-            if len(test_group["Tests"]) > 0:
-                test_groups[i] = test_group
-            else:
-                # remember which groups should be deleted
-                # put the numbers backwards so that we start
-                # deleting from the end, not from the beginning
-                groups_to_remove.insert(0, i)
+    # this function checks individual test and decides if this test should be in
+    # the group by comparing it against  whitelist/blacklist. it also checks if
+    # the test is compiled into the binary, and marks it as skipped if necessary
+    def __filter_test(self, test):
+        test_cmd = test["Command"]
+        test_id = test_cmd
+
+        # dump tests are specified in full e.g. "Dump_mempool"
+        if "_autotest" in test_id:
+            test_id = test_id[:-len("_autotest")]
+
+        # filter out blacklisted/whitelisted tests
+        if self.blacklist and test_id in self.blacklist:
+            return False
+        if self.whitelist and test_id not in self.whitelist:
+            return False
+
+        # if test wasn't compiled in, remove it as well
+
+        # parse the binary for available test commands
+        stripped = 'not stripped' not in \
+                   subprocess.check_output(['file', self.binary])
+        if not stripped:
+            symbols = subprocess.check_output(['nm',
+                                               self.binary]).decode('utf-8')
+            avail_cmds = re.findall('test_register_(\w+)', symbols)
+
+            if test_cmd not in avail_cmds:
+                # notify user
+                result = 0, "Skipped [Not compiled]", test_id, 0, "", None
+                self.skipped.append(tuple(result))
+                return False
 
-        # remove test groups that need to be removed
-        for i in groups_to_remove:
-            del test_groups[i]
+        return True
 
-        return test_groups
+    def __filter_group(self, group):
+        group["Tests"] = list(filter(self.__filter_test, group["Tests"]))
+        return len(group["Tests"]) > 0
 
     # iterate over test groups and run tests associated with them
     def run_all_tests(self):
         # filter groups
-        self.parallel_test_groups = \
-            self.__filter_groups(self.parallel_test_groups)
-        self.non_parallel_test_groups = \
-            self.__filter_groups(self.non_parallel_test_groups)
+        # for each test group, check all tests against the filter, then remove
+        # all groups that don't have any tests
+        self.parallel_test_groups = list(
+            filter(self.__filter_group,
+                   self.parallel_test_groups)
+        )
+        self.non_parallel_test_groups = list(
+            filter(self.__filter_group,
+                   self.non_parallel_test_groups)
+        )
 
         # create a pool of worker threads
         pool = multiprocessing.Pool(processes=1)
@@ -360,17 +359,36 @@ class AutotestRunner:
 
             # create table header
             print("")
-            print("Test name".ljust(30), "Test result".ljust(29),
-                  "Test".center(9), "Total".center(9))
+            print("Test name".ljust(30) + "Test result".ljust(29) +
+                  "Test".center(9) + "Total".center(9))
             print("=" * 80)
 
+            # print out skipped autotests if there were any
+            if len(self.skipped):
+                print("Skipped autotests:")
+
+                # print out any skipped tests
+                for result in self.skipped:
+                    # unpack result tuple
+                    test_result, result_str, test_name, _, _, _ = result
+                    self.csvwriter.writerow([test_name, test_result,
+                                             result_str])
+
+                    t = ("%s:" % test_name).ljust(30)
+                    t += result_str.ljust(29)
+                    t += "[00m 00s]"
+
+                    print(t)
+
             # make a note of tests start time
             self.start = time.time()
 
+            print("Parallel autotests:")
             # assign worker threads to run test groups
             for test_group in self.parallel_test_groups:
                 result = pool.apply_async(run_test_group,
                                           [self.__get_cmdline(test_group),
+                                           self.target,
                                            test_group])
                 results.append(result)
 
@@ -392,10 +410,11 @@ class AutotestRunner:
                     # remove result from results list once we're done with it
                     results.remove(group_result)
 
+            print("Non-parallel autotests:")
             # run non_parallel tests. they are run one by one, synchronously
             for test_group in self.non_parallel_test_groups:
                 group_result = run_test_group(
-                    self.__get_cmdline(test_group), test_group)
+                    self.__get_cmdline(test_group), self.target, test_group)
 
                 self.__process_results(group_result)