tests: cpus awareness 50/31950/14
authorKlement Sekera <ksekera@cisco.com>
Thu, 8 Apr 2021 17:37:41 +0000 (19:37 +0200)
committerAndrew Yourtchenko <ayourtch@gmail.com>
Fri, 16 Apr 2021 09:26:33 +0000 (09:26 +0000)
Introduce MAX_CPUS parameters to control maximum number of CPUs used by
VPP(s) during testing, with default value 'auto' corresponding to all
CPUs available.

Calculate test CPU requirements by taking into account the number of
workers, so a test requires 1 (main thread) + # of worker CPUs.

When running tests, keep track of both running test jobs (controlled by
TEST_JOBS parameter) and free CPUs. This then causes two limits in the
system - to not exceed number of jobs in parallel but also to not exceed
number of CPUs available.

Skip tests which require more CPUs than are available in system (or more
than MAX_CPUS) and print a warning message.

Type: improvement
Change-Id: Ib8fda54e4c6a36179d64160bb87fbd3a0011762d
Signed-off-by: Klement Sekera <ksekera@cisco.com>
src/plugins/memif/test/test_memif.py
test/Makefile
test/cpu_config.py [new file with mode: 0644]
test/framework.py
test/log.py
test/run_tests.py
test/sanity_run_vpp.py
test/test_util.py

index caaab87..fc7cf9b 100644 (file)
@@ -16,11 +16,24 @@ from vpp_papi import VppEnum
 @tag_run_solo
 class TestMemif(VppTestCase):
     """ Memif Test Case """
+    remote_class = RemoteVppTestCase
+
+    @classmethod
+    def get_cpus_required(cls):
+        return (super().get_cpus_required() +
+                cls.remote_class.get_cpus_required())
+
+    @classmethod
+    def assign_cpus(cls, cpus):
+        remote_cpus = cpus[:cls.remote_class.get_cpus_required()]
+        my_cpus = cpus[cls.remote_class.get_cpus_required():]
+        cls.remote_class.assign_cpus(remote_cpus)
+        super().assign_cpus(my_cpus)
 
     @classmethod
     def setUpClass(cls):
         # fork new process before client connects to VPP
-        cls.remote_test = RemoteClass(RemoteVppTestCase)
+        cls.remote_test = RemoteClass(cls.remote_class)
         cls.remote_test.start_remote()
         cls.remote_test.set_request_timeout(10)
         super(TestMemif, cls).setUpClass()
index dc6aa09..0ee61a2 100644 (file)
@@ -366,7 +366,8 @@ help:
        @echo "Arguments controlling test runs:"
        @echo " V=[0|1|2]              - set test verbosity level"
        @echo "                          0=ERROR, 1=INFO, 2=DEBUG"
-       @echo " TEST_JOBS=[<n>|auto]   - use <n> parallel processes for test execution or automatic discovery of maximum acceptable processes (default: 1)"
+       @echo " TEST_JOBS=[<n>|auto]   - use at most <n> parallel python processes for test execution, if auto, set to number of available cpus (default: 1)"
+       @echo " MAX_VPP_CPUS=[<n>|auto]- use at most <n> cpus for running vpp main and worker threads, if auto, set to number of available cpus (default: auto)"
        @echo " CACHE_OUTPUT=[0|1]     - cache VPP stdout/stderr and log as one block after test finishes (default: 1)"
        @echo " FAILFAST=[0|1]         - fail fast if 1, complete all tests if 0"
        @echo " TIMEOUT=<timeout>      - fail test suite if any single test takes longer than <timeout> (in seconds) to finish (default: 600)"
diff --git a/test/cpu_config.py b/test/cpu_config.py
new file mode 100644 (file)
index 0000000..b4e5d1a
--- /dev/null
@@ -0,0 +1,21 @@
+import os
+import psutil
+
+available_cpus = psutil.Process().cpu_affinity()
+num_cpus = len(available_cpus)
+
+max_vpp_cpus = os.getenv("MAX_VPP_CPUS", "auto").lower()
+
+if max_vpp_cpus == "auto":
+    max_vpp_cpus = num_cpus
+else:
+    try:
+        max_vpp_cpus = int(max_vpp_cpus)
+    except ValueError as e:
+        raise ValueError("Invalid MAX_VPP_CPUS value specified, valid "
+                         "values are a positive integer or 'auto'") from e
+    if max_vpp_cpus <= 0:
+        raise ValueError("Invalid MAX_VPP_CPUS value specified, valid "
+                         "values are a positive integer or 'auto'")
+    if max_vpp_cpus > num_cpus:
+        max_vpp_cpus = num_cpus
index 67ac495..1cbd814 100644 (file)
@@ -22,6 +22,7 @@ from inspect import getdoc, isclass
 from traceback import format_exception
 from logging import FileHandler, DEBUG, Formatter
 from enum import Enum
+from abc import ABC, abstractmethod
 
 import scapy.compat
 from scapy.packet import Raw
@@ -42,6 +43,8 @@ from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
 from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
 from scapy.layers.inet6 import ICMPv6EchoReply
 
+from cpu_config import available_cpus, num_cpus, max_vpp_cpus
+
 logger = logging.getLogger(__name__)
 
 # Set up an empty logger for the testcase that can be overridden as necessary
@@ -53,6 +56,7 @@ FAIL = 1
 ERROR = 2
 SKIP = 3
 TEST_RUN = 4
+SKIP_CPU_SHORTAGE = 5
 
 
 class BoolEnvironmentVariable(object):
@@ -223,6 +227,21 @@ def _running_gcov_tests():
 running_gcov_tests = _running_gcov_tests()
 
 
+def get_environ_vpp_worker_count():
+    worker_config = os.getenv("VPP_WORKER_CONFIG", None)
+    if worker_config:
+        elems = worker_config.split(" ")
+        if elems[0] != "workers" or len(elems) != 2:
+            raise ValueError("Wrong VPP_WORKER_CONFIG == '%s' value." %
+                             worker_config)
+        return int(elems[1])
+    else:
+        return 0
+
+
+environ_vpp_worker_count = get_environ_vpp_worker_count()
+
+
 class KeepAliveReporter(object):
     """
     Singleton object which reports test start to parent process
@@ -292,7 +311,21 @@ class DummyVpp:
         pass
 
 
-class VppTestCase(unittest.TestCase):
+class CPUInterface(ABC):
+    cpus = []
+    skipped_due_to_cpu_lack = False
+
+    @classmethod
+    @abstractmethod
+    def get_cpus_required(cls):
+        pass
+
+    @classmethod
+    def assign_cpus(cls, cpus):
+        cls.cpus = cpus
+
+
+class VppTestCase(CPUInterface, unittest.TestCase):
     """This subclass is a base class for VPP test cases that are implemented as
     classes. It provides methods to create and run test case.
     """
@@ -358,34 +391,18 @@ class VppTestCase(unittest.TestCase):
         if dl == "gdb-all" or dl == "gdbserver-all":
             cls.debug_all = True
 
-    @staticmethod
-    def get_least_used_cpu():
-        cpu_usage_list = [set(range(psutil.cpu_count()))]
-        vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
-                         if 'vpp_main' == p.info['name']]
-        for vpp_process in vpp_processes:
-            for cpu_usage_set in cpu_usage_list:
-                try:
-                    cpu_num = vpp_process.cpu_num()
-                    if cpu_num in cpu_usage_set:
-                        cpu_usage_set_index = cpu_usage_list.index(
-                            cpu_usage_set)
-                        if cpu_usage_set_index == len(cpu_usage_list) - 1:
-                            cpu_usage_list.append({cpu_num})
-                        else:
-                            cpu_usage_list[cpu_usage_set_index + 1].add(
-                                cpu_num)
-                        cpu_usage_set.remove(cpu_num)
-                        break
-                except psutil.NoSuchProcess:
-                    pass
-
-        for cpu_usage_set in cpu_usage_list:
-            if len(cpu_usage_set) > 0:
-                min_usage_set = cpu_usage_set
-                break
+    @classmethod
+    def get_vpp_worker_count(cls):
+        if not hasattr(cls, "vpp_worker_count"):
+            if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
+                cls.vpp_worker_count = 0
+            else:
+                cls.vpp_worker_count = environ_vpp_worker_count
+        return cls.vpp_worker_count
 
-        return random.choice(tuple(min_usage_set))
+    @classmethod
+    def get_cpus_required(cls):
+        return 1 + cls.get_vpp_worker_count()
 
     @classmethod
     def setUpConstants(cls):
@@ -417,20 +434,6 @@ class VppTestCase(unittest.TestCase):
         if coredump_size is None:
             coredump_size = "coredump-size unlimited"
 
-        cpu_core_number = cls.get_least_used_cpu()
-        if not hasattr(cls, "vpp_worker_count"):
-            cls.vpp_worker_count = 0
-            worker_config = os.getenv("VPP_WORKER_CONFIG", "")
-            if worker_config:
-                elems = worker_config.split(" ")
-                if elems[0] != "workers" or len(elems) != 2:
-                    raise ValueError("Wrong VPP_WORKER_CONFIG == '%s' value." %
-                                     worker_config)
-                cls.vpp_worker_count = int(elems[1])
-                if cls.vpp_worker_count > 0 and\
-                        cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
-                    cls.vpp_worker_count = 0
-
         default_variant = os.getenv("VARIANT")
         if default_variant is not None:
             default_variant = "defaults { %s 100 }" % default_variant
@@ -447,9 +450,10 @@ class VppTestCase(unittest.TestCase):
             coredump_size, "runtime-dir", cls.tempdir, "}",
             "api-trace", "{", "on", "}",
             "api-segment", "{", "prefix", cls.get_api_segment_prefix(), "}",
-            "cpu", "{", "main-core", str(cpu_core_number), ]
-        if cls.vpp_worker_count:
-            cls.vpp_cmdline.extend(["workers", str(cls.vpp_worker_count)])
+            "cpu", "{", "main-core", str(cls.cpus[0]), ]
+        if cls.get_vpp_worker_count():
+            cls.vpp_cmdline.extend([
+                "corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])])
         cls.vpp_cmdline.extend([
             "}",
             "physmem", "{", "max-size", "32m", "}",
@@ -509,11 +513,12 @@ class VppTestCase(unittest.TestCase):
 
     @classmethod
     def run_vpp(cls):
+        cls.logger.debug(f"Assigned cpus: {cls.cpus}")
         cmdline = cls.vpp_cmdline
 
         if cls.debug_gdbserver:
             gdbserver = '/usr/bin/gdbserver'
-            if not os.path.isfile(gdbserver) or \
+            if not os.path.isfile(gdbserver) or\
                     not os.access(gdbserver, os.X_OK):
                 raise Exception("gdbserver binary '%s' does not exist or is "
                                 "not executable" % gdbserver)
@@ -1349,6 +1354,7 @@ class VppTestResult(unittest.TestResult):
         self.verbosity = verbosity
         self.result_string = None
         self.runner = runner
+        self.printed = []
 
     def addSuccess(self, test):
         """
@@ -1383,7 +1389,10 @@ class VppTestResult(unittest.TestResult):
         unittest.TestResult.addSkip(self, test, reason)
         self.result_string = colorize("SKIP", YELLOW)
 
-        self.send_result_through_pipe(test, SKIP)
+        if reason == "not enough cpus":
+            self.send_result_through_pipe(test, SKIP_CPU_SHORTAGE)
+        else:
+            self.send_result_through_pipe(test, SKIP)
 
     def symlink_failed(self):
         if self.current_test_case_info:
@@ -1501,28 +1510,34 @@ class VppTestResult(unittest.TestResult):
         """
 
         def print_header(test):
+            if test.__class__ in self.printed:
+                return
+
             test_doc = getdoc(test)
             if not test_doc:
                 raise Exception("No doc string for test '%s'" % test.id())
+
             test_title = test_doc.splitlines()[0]
-            test_title_colored = colorize(test_title, GREEN)
+            test_title = colorize(test_title, GREEN)
             if test.is_tagged_run_solo():
-                # long live PEP-8 and 80 char width limitation...
-                c = YELLOW
-                test_title_colored = colorize("SOLO RUN: " + test_title, c)
+                test_title = colorize(f"SOLO RUN: {test_title}", YELLOW)
 
             # This block may overwrite the colorized title above,
             # but we want this to stand out and be fixed
             if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
-                c = RED
-                w = "FIXME with VPP workers: "
-                test_title_colored = colorize(w + test_title, c)
-
-            if not hasattr(test.__class__, '_header_printed'):
-                print(double_line_delim)
-                print(test_title_colored)
-                print(double_line_delim)
-            test.__class__._header_printed = True
+                test_title = colorize(
+                    f"FIXME with VPP workers: {test_title}", RED)
+
+            if test.__class__.skipped_due_to_cpu_lack:
+                test_title = colorize(
+                    f"{test_title} [skipped - not enough cpus, "
+                    f"required={test.__class__.get_cpus_required()}, "
+                    f"available={max_vpp_cpus}]", YELLOW)
+
+            print(double_line_delim)
+            print(test_title)
+            print(double_line_delim)
+            self.printed.append(test.__class__)
 
         print_header(test)
         self.start_test = time.time()
index b0fe037..5fd91c7 100644 (file)
@@ -11,7 +11,7 @@ single_line_delim = '-' * 78
 
 
 def colorize(msg, color):
-    return color + msg + COLOR_RESET
+    return f"{color}{msg}{COLOR_RESET}"
 
 
 class ColorFormatter(logging.Formatter):
index 008828c..5d091ad 100644 (file)
@@ -9,22 +9,21 @@ import argparse
 import time
 import threading
 import signal
-import psutil
 import re
-import multiprocessing
-from multiprocessing import Process, Pipe, cpu_count
+from multiprocessing import Process, Pipe, get_context
 from multiprocessing.queues import Queue
 from multiprocessing.managers import BaseManager
 import framework
 from framework import VppTestRunner, running_extended_tests, VppTestCase, \
     get_testcase_doc_name, get_test_description, PASS, FAIL, ERROR, SKIP, \
-    TEST_RUN
+    TEST_RUN, SKIP_CPU_SHORTAGE
 from debug import spawn_gdb, start_vpp_in_gdb
 from log import get_parallel_logger, double_line_delim, RED, YELLOW, GREEN, \
     colorize, single_line_delim
 from discover_tests import discover_tests
 from subprocess import check_output, CalledProcessError
 from util import check_core_path, get_core_path, is_core_present
+from cpu_config import num_cpus, max_vpp_cpus, available_cpus
 
 # timeout which controls how long the child has to finish after seeing
 # a core dump in test temporary directory. If this is exceeded, parent assumes
@@ -59,6 +58,7 @@ class TestResult(dict):
         self[FAIL] = []
         self[ERROR] = []
         self[SKIP] = []
+        self[SKIP_CPU_SHORTAGE] = []
         self[TEST_RUN] = []
         self.crashed = False
         self.testcase_suite = testcase_suite
@@ -67,8 +67,8 @@ class TestResult(dict):
 
     def was_successful(self):
         return 0 == len(self[FAIL]) == len(self[ERROR]) \
-            and len(self[PASS] + self[SKIP]) \
-            == self.testcase_suite.countTestCases() == len(self[TEST_RUN])
+            and len(self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE]) \
+            == self.testcase_suite.countTestCases()
 
     def no_tests_run(self):
         return 0 == len(self[TEST_RUN])
@@ -80,7 +80,7 @@ class TestResult(dict):
         rerun_ids = set([])
         for testcase in self.testcase_suite:
             tc_id = testcase.id()
-            if tc_id not in self[PASS] and tc_id not in self[SKIP]:
+            if tc_id not in self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE]:
                 rerun_ids.add(tc_id)
         if rerun_ids:
             return suite_from_failed(self.testcase_suite, rerun_ids)
@@ -142,11 +142,7 @@ class TestCaseWrapper(object):
         self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
         self.result_parent_end, self.result_child_end = Pipe(duplex=False)
         self.testcase_suite = testcase_suite
-        if sys.version[0] == '2':
-            self.stdouterr_queue = manager.StreamQueue()
-        else:
-            from multiprocessing import get_context
-            self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
+        self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
         self.logger = get_parallel_logger(self.stdouterr_queue)
         self.child = Process(target=test_runner_wrapper,
                              args=(testcase_suite,
@@ -213,6 +209,13 @@ class TestCaseWrapper(object):
     def was_successful(self):
         return self.result.was_successful()
 
+    @property
+    def cpus_used(self):
+        return self.testcase_suite.cpus_used
+
+    def get_assigned_cpus(self):
+        return self.testcase_suite.get_assigned_cpus()
+
 
 def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases,
                              read_testcases):
@@ -324,7 +327,6 @@ def process_finished_testsuite(wrapped_testcase_suite,
 def run_forked(testcase_suites):
     wrapped_testcase_suites = set()
     solo_testcase_suites = []
-    total_test_runners = 0
 
     # suites are unhashable, need to use list
     results = []
@@ -332,31 +334,53 @@ def run_forked(testcase_suites):
     finished_unread_testcases = set()
     manager = StreamQueueManager()
     manager.start()
-    total_test_runners = 0
-    while total_test_runners < concurrent_tests:
-        if testcase_suites:
+    tests_running = 0
+    free_cpus = list(available_cpus)
+
+    def on_suite_start(tc):
+        nonlocal tests_running
+        nonlocal free_cpus
+        tests_running = tests_running + 1
+
+    def on_suite_finish(tc):
+        nonlocal tests_running
+        nonlocal free_cpus
+        tests_running = tests_running - 1
+        assert tests_running >= 0
+        free_cpus.extend(tc.get_assigned_cpus())
+
+    def run_suite(suite):
+        nonlocal manager
+        nonlocal wrapped_testcase_suites
+        nonlocal unread_testcases
+        nonlocal free_cpus
+        suite.assign_cpus(free_cpus[:suite.cpus_used])
+        free_cpus = free_cpus[suite.cpus_used:]
+        wrapper = TestCaseWrapper(suite, manager)
+        wrapped_testcase_suites.add(wrapper)
+        unread_testcases.add(wrapper)
+        on_suite_start(suite)
+
+    def can_run_suite(suite):
+        return (tests_running < max_concurrent_tests and
+                (suite.cpus_used <= len(free_cpus) or
+                 suite.cpus_used > max_vpp_cpus))
+
+    while free_cpus and testcase_suites:
+        a_suite = testcase_suites[0]
+        if a_suite.is_tagged_run_solo:
+            a_suite = testcase_suites.pop(0)
+            solo_testcase_suites.append(a_suite)
+            continue
+        if can_run_suite(a_suite):
             a_suite = testcase_suites.pop(0)
-            if a_suite.is_tagged_run_solo:
-                solo_testcase_suites.append(a_suite)
-                continue
-            wrapped_testcase_suite = TestCaseWrapper(a_suite,
-                                                     manager)
-            wrapped_testcase_suites.add(wrapped_testcase_suite)
-            unread_testcases.add(wrapped_testcase_suite)
-            total_test_runners = total_test_runners + 1
+            run_suite(a_suite)
         else:
             break
 
-    while total_test_runners < 1 and solo_testcase_suites:
-        if solo_testcase_suites:
-            a_suite = solo_testcase_suites.pop(0)
-            wrapped_testcase_suite = TestCaseWrapper(a_suite,
-                                                     manager)
-            wrapped_testcase_suites.add(wrapped_testcase_suite)
-            unread_testcases.add(wrapped_testcase_suite)
-            total_test_runners = total_test_runners + 1
-        else:
-            break
+    if tests_running == 0 and solo_testcase_suites:
+        a_suite = solo_testcase_suites.pop(0)
+        run_suite(a_suite)
 
     read_from_testcases = threading.Event()
     read_from_testcases.set()
@@ -466,31 +490,23 @@ def run_forked(testcase_suites):
                 wrapped_testcase_suites.remove(finished_testcase)
                 finished_unread_testcases.add(finished_testcase)
                 finished_testcase.stdouterr_queue.put(None)
-                total_test_runners = total_test_runners - 1
+                on_suite_finish(finished_testcase)
                 if stop_run:
                     while testcase_suites:
                         results.append(TestResult(testcase_suites.pop(0)))
                 elif testcase_suites:
-                    a_testcase = testcase_suites.pop(0)
-                    while a_testcase and a_testcase.is_tagged_run_solo:
-                        solo_testcase_suites.append(a_testcase)
+                    a_suite = testcase_suites.pop(0)
+                    while a_suite and a_suite.is_tagged_run_solo:
+                        solo_testcase_suites.append(a_suite)
                         if testcase_suites:
-                            a_testcase = testcase_suites.pop(0)
+                            a_suite = testcase_suites.pop(0)
                         else:
-                            a_testcase = None
-                    if a_testcase:
-                        new_testcase = TestCaseWrapper(a_testcase,
-                                                       manager)
-                        wrapped_testcase_suites.add(new_testcase)
-                        total_test_runners = total_test_runners + 1
-                        unread_testcases.add(new_testcase)
-                if solo_testcase_suites and total_test_runners == 0:
-                    a_testcase = solo_testcase_suites.pop(0)
-                    new_testcase = TestCaseWrapper(a_testcase,
-                                                   manager)
-                    wrapped_testcase_suites.add(new_testcase)
-                    total_test_runners = total_test_runners + 1
-                    unread_testcases.add(new_testcase)
+                            a_suite = None
+                    if a_suite and can_run_suite(a_suite):
+                        run_suite(a_suite)
+                if solo_testcase_suites and tests_running == 0:
+                    a_suite = solo_testcase_suites.pop(0)
+                    run_suite(a_suite)
             time.sleep(0.1)
     except Exception:
         for wrapped_testcase_suite in wrapped_testcase_suites:
@@ -506,19 +522,41 @@ def run_forked(testcase_suites):
     return results
 
 
+class TestSuiteWrapper(unittest.TestSuite):
+    cpus_used = 0
+
+    def __init__(self):
+        return super().__init__()
+
+    def addTest(self, test):
+        self.cpus_used = max(self.cpus_used, test.get_cpus_required())
+        super().addTest(test)
+
+    def assign_cpus(self, cpus):
+        self.cpus = cpus
+
+    def _handleClassSetUp(self, test, result):
+        if not test.__class__.skipped_due_to_cpu_lack:
+            test.assign_cpus(self.cpus)
+        super()._handleClassSetUp(test, result)
+
+    def get_assigned_cpus(self):
+        return self.cpus
+
+
 class SplitToSuitesCallback:
     def __init__(self, filter_callback):
         self.suites = {}
         self.suite_name = 'default'
         self.filter_callback = filter_callback
-        self.filtered = unittest.TestSuite()
+        self.filtered = TestSuiteWrapper()
 
     def __call__(self, file_name, cls, method):
         test_method = cls(method)
         if self.filter_callback(file_name, cls.__name__, method):
             self.suite_name = file_name + cls.__name__
             if self.suite_name not in self.suites:
-                self.suites[self.suite_name] = unittest.TestSuite()
+                self.suites[self.suite_name] = TestSuiteWrapper()
                 self.suites[self.suite_name].is_tagged_run_solo = False
             self.suites[self.suite_name].addTest(test_method)
             if test_method.is_tagged_run_solo():
@@ -563,7 +601,7 @@ def parse_test_option():
 
 
 def filter_tests(tests, filter_cb):
-    result = unittest.suite.TestSuite()
+    result = TestSuiteWrapper()
     for t in tests:
         if isinstance(t, unittest.suite.TestSuite):
             # this is a bunch of tests, recursively filter...
@@ -628,13 +666,14 @@ class AllResults(dict):
         self[FAIL] = 0
         self[ERROR] = 0
         self[SKIP] = 0
+        self[SKIP_CPU_SHORTAGE] = 0
         self[TEST_RUN] = 0
         self.rerun = []
         self.testsuites_no_tests_run = []
 
     def add_results(self, result):
         self.results_per_suite.append(result)
-        result_types = [PASS, FAIL, ERROR, SKIP, TEST_RUN]
+        result_types = [PASS, FAIL, ERROR, SKIP, TEST_RUN, SKIP_CPU_SHORTAGE]
         for result_type in result_types:
             self[result_type] += len(result[result_type])
 
@@ -661,22 +700,29 @@ class AllResults(dict):
         print('')
         print(double_line_delim)
         print('TEST RESULTS:')
-        print('     Scheduled tests: {}'.format(self.all_testcases))
-        print('      Executed tests: {}'.format(self[TEST_RUN]))
-        print('        Passed tests: {}'.format(
-            colorize(str(self[PASS]), GREEN)))
-        if self[SKIP] > 0:
-            print('       Skipped tests: {}'.format(
-                colorize(str(self[SKIP]), YELLOW)))
-        if self.not_executed > 0:
-            print('  Not Executed tests: {}'.format(
-                colorize(str(self.not_executed), RED)))
-        if self[FAIL] > 0:
-            print('            Failures: {}'.format(
-                colorize(str(self[FAIL]), RED)))
-        if self[ERROR] > 0:
-            print('              Errors: {}'.format(
-                colorize(str(self[ERROR]), RED)))
+
+        def indent_results(lines):
+            lines = list(filter(None, lines))
+            maximum = max(lines, key=lambda x: x.index(":"))
+            maximum = 4 + maximum.index(":")
+            for l in lines:
+                padding = " " * (maximum - l.index(":"))
+                print(f"{padding}{l}")
+
+        indent_results([
+            f'Scheduled tests: {self.all_testcases}',
+            f'Executed tests: {self[TEST_RUN]}',
+            f'Passed tests: {colorize(self[PASS], GREEN)}',
+            f'Skipped tests: {colorize(self[SKIP], YELLOW)}'
+            if self[SKIP] else None,
+            f'Not Executed tests: {colorize(self.not_executed, RED)}'
+            if self.not_executed else None,
+            f'Failures: {colorize(self[FAIL], RED)}' if self[FAIL] else None,
+            f'Errors: {colorize(self[ERROR], RED)}' if self[ERROR] else None,
+            'Tests skipped due to lack of CPUS: '
+            f'{colorize(self[SKIP_CPU_SHORTAGE], YELLOW)}'
+            if self[SKIP_CPU_SHORTAGE] else None
+        ])
 
         if self.all_failed > 0:
             print('FAILURES AND ERRORS IN TESTS:')
@@ -713,6 +759,10 @@ class AllResults(dict):
             for tc_class in tc_classes:
                 print('  {}'.format(colorize(tc_class, RED)))
 
+        if self[SKIP_CPU_SHORTAGE]:
+            print()
+            print(colorize('     SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT'
+                           ' ENOUGH CPUS AVAILABLE', YELLOW))
         print(double_line_delim)
         print('')
 
@@ -793,31 +843,34 @@ if __name__ == '__main__':
 
     run_interactive = debug or step or force_foreground
 
-    try:
-        num_cpus = len(os.sched_getaffinity(0))
-    except AttributeError:
-        num_cpus = multiprocessing.cpu_count()
-
-    print("OS reports %s available cpu(s)." % num_cpus)
+    max_concurrent_tests = 0
+    print(f"OS reports {num_cpus} available cpu(s).")
 
     test_jobs = os.getenv("TEST_JOBS", "1").lower()  # default = 1 process
     if test_jobs == 'auto':
         if run_interactive:
-            concurrent_tests = 1
-            print('Interactive mode required, running on one core')
+            max_concurrent_tests = 1
+            print('Interactive mode required, running tests consecutively.')
         else:
-            concurrent_tests = num_cpus
-            print('Found enough resources to run tests with %s cores'
-                  % concurrent_tests)
-    elif test_jobs.isdigit():
-        concurrent_tests = int(test_jobs)
-        print("Running on %s core(s) as set by 'TEST_JOBS'." %
-              concurrent_tests)
+            max_concurrent_tests = num_cpus
+            print(f"Running at most {max_concurrent_tests} python test "
+                  "processes concurrently.")
     else:
-        concurrent_tests = 1
-        print('Running on one core.')
-
-    if run_interactive and concurrent_tests > 1:
+        try:
+            test_jobs = int(test_jobs)
+        except ValueError as e:
+            raise ValueError("Invalid TEST_JOBS value specified, valid "
+                             "values are a positive integer or 'auto'") from e
+        if test_jobs <= 0:
+            raise ValueError("Invalid TEST_JOBS value specified, valid "
+                             "values are a positive integer or 'auto'")
+        max_concurrent_tests = int(test_jobs)
+        print(f"Running at most {max_concurrent_tests} python test processes "
+              "concurrently as set by 'TEST_JOBS'.")
+
+    print(f"Using at most {max_vpp_cpus} cpus for VPP threads.")
+
+    if run_interactive and max_concurrent_tests > 1:
         raise NotImplementedError(
             'Running tests interactively (DEBUG is gdb[server] or ATTACH or '
             'STEP is set) in parallel (TEST_JOBS is more than 1) is not '
@@ -833,7 +886,7 @@ if __name__ == '__main__':
     failfast = args.failfast
     descriptions = True
 
-    print("Running tests using custom test runner")  # debug message
+    print("Running tests using custom test runner.")
     filter_file, filter_class, filter_func = parse_test_option()
 
     print("Active filters: file=%s, class=%s, function=%s" % (
@@ -852,6 +905,21 @@ if __name__ == '__main__':
     tests_amount = 0
     for testcase_suite in cb.suites.values():
         tests_amount += testcase_suite.countTestCases()
+        if testcase_suite.cpus_used > max_vpp_cpus:
+            # here we replace test functions with lambdas to just skip them
+            # but we also replace setUp/tearDown functions to do nothing
+            # so that the test can be "started" and "stopped", so that we can
+            # still keep those prints (test description - SKIP), which are done
+            # in stopTest() (for that to trigger, test function must run)
+            for t in testcase_suite:
+                for m in dir(t):
+                    if m.startswith('test_'):
+                        setattr(t, m, lambda: t.skipTest("not enough cpus"))
+                setattr(t.__class__, 'setUpClass', lambda: None)
+                setattr(t.__class__, 'tearDownClass', lambda: None)
+                setattr(t, 'setUp', lambda: None)
+                setattr(t, 'tearDown', lambda: None)
+                t.__class__.skipped_due_to_cpu_lack = True
         suites.append(testcase_suite)
 
     print("%s out of %s tests match specified filters" % (
@@ -868,6 +936,14 @@ if __name__ == '__main__':
         # don't fork if requiring interactive terminal
         print('Running tests in foreground in the current process')
         full_suite = unittest.TestSuite()
+        free_cpus = list(available_cpus)
+        cpu_shortage = False
+        for suite in suites:
+            if suite.cpus_used <= max_vpp_cpus:
+                suite.assign_cpus(free_cpus[:suite.cpus_used])
+            else:
+                suite.assign_cpus([])
+                cpu_shortage = True
         full_suite.addTests(suites)
         result = VppTestRunner(verbosity=verbose,
                                failfast=failfast,
@@ -883,10 +959,16 @@ if __name__ == '__main__':
                                           test_case_info.tempdir,
                                           test_case_info.core_crash_test)
 
+        if cpu_shortage:
+            print()
+            print(colorize('SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT'
+                           ' ENOUGH CPUS AVAILABLE', YELLOW))
+            print()
         sys.exit(not was_successful)
     else:
         print('Running each VPPTestCase in a separate background process'
-              ' with {} parallel process(es)'.format(concurrent_tests))
+              f' with at most {max_concurrent_tests} parallel python test '
+              'process(es)')
         exit_code = 0
         while suites and attempts > 0:
             results = run_forked(suites)
index f91dc83..4b21de1 100644 (file)
@@ -9,7 +9,7 @@ from framework import VppDiedError, VppTestCase, KeepAliveReporter
 
 class SanityTestCase(VppTestCase):
     """ Sanity test case - verify whether VPP is able to start """
-    pass
+    cpus = [0]
 
     # don't ask to debug SanityTestCase
     @classmethod
index 421afce..3a61d64 100755 (executable)
@@ -2,11 +2,11 @@
 """Test framework utility functions tests"""
 
 import unittest
-from framework import VppTestRunner
+from framework import VppTestRunner, CPUInterface
 from vpp_papi import mac_pton, mac_ntop
 
 
-class TestUtil (unittest.TestCase):
+class TestUtil (CPUInterface, unittest.TestCase):
     """ Test framework utility tests """
 
     @classmethod
@@ -23,6 +23,10 @@ class TestUtil (unittest.TestCase):
             pass
         return False
 
+    @classmethod
+    def get_cpus_required(cls):
+        return 0
+
     def test_mac_to_binary(self):
         """ MAC to binary and back """
         mac = 'aa:bb:cc:dd:ee:ff'