tests docs: update python3 venv packages
[vpp.git] / test / run_tests.py
1 #!/usr/bin/env python3
2
3 import sys
4 import shutil
5 import os
6 import fnmatch
7 import unittest
8 import time
9 import threading
10 import traceback
11 import signal
12 import re
13 from multiprocessing import Process, Pipe, get_context
14 from multiprocessing.queues import Queue
15 from multiprocessing.managers import BaseManager
16 from config import config, num_cpus, available_cpus, max_vpp_cpus
17 from framework import (
18     VppTestRunner,
19     VppTestCase,
20     get_testcase_doc_name,
21     get_test_description,
22 )
23 from test_result_code import TestResultCode
24 from debug import spawn_gdb
25 from log import (
26     get_parallel_logger,
27     double_line_delim,
28     RED,
29     YELLOW,
30     GREEN,
31     colorize,
32     single_line_delim,
33 )
34 from discover_tests import discover_tests
35 import sanity_run_vpp
36 from subprocess import check_output, CalledProcessError
37 from util import check_core_path, get_core_path, is_core_present
38
39 # timeout which controls how long the child has to finish after seeing
40 # a core dump in test temporary directory. If this is exceeded, parent assumes
41 # that child process is stuck (e.g. waiting for event from vpp) and kill
42 # the child
43 core_timeout = 3
44
45
46 class StreamQueue(Queue):
47     def write(self, msg):
48         self.put(msg)
49
50     def flush(self):
51         sys.__stdout__.flush()
52         sys.__stderr__.flush()
53
54     def fileno(self):
55         return self._writer.fileno()
56
57
58 class StreamQueueManager(BaseManager):
59     pass
60
61
62 StreamQueueManager.register("StreamQueue", StreamQueue)
63
64
65 class TestResult(dict):
66     def __init__(self, testcase_suite, testcases_by_id=None):
67         super(TestResult, self).__init__()
68         for trc in list(TestResultCode):
69             self[trc] = []
70         self.crashed = False
71         self.testcase_suite = testcase_suite
72         self.testcases = [testcase for testcase in testcase_suite]
73         self.testcases_by_id = testcases_by_id
74
75     def was_successful(self):
76         return (
77             0
78             == len(self[TestResultCode.FAIL])
79             == len(self[TestResultCode.ERROR])
80             == len(self[TestResultCode.UNEXPECTED_PASS])
81             and len(self[TestResultCode.PASS])
82             + len(self[TestResultCode.SKIP])
83             + len(self[TestResultCode.SKIP_CPU_SHORTAGE])
84             + len(self[TestResultCode.EXPECTED_FAIL])
85             == self.testcase_suite.countTestCases()
86         )
87
88     def no_tests_run(self):
89         return 0 == len(self[TestResultCode.TEST_RUN])
90
91     def process_result(self, test_id, result):
92         self[result].append(test_id)
93
94     def suite_from_failed(self):
95         rerun_ids = set([])
96         for testcase in self.testcase_suite:
97             tc_id = testcase.id()
98             if (
99                 tc_id
100                 not in self[TestResultCode.PASS]
101                 + self[TestResultCode.SKIP]
102                 + self[TestResultCode.SKIP_CPU_SHORTAGE]
103                 + self[TestResultCode.EXPECTED_FAIL]
104             ):
105                 rerun_ids.add(tc_id)
106         if rerun_ids:
107             return suite_from_failed(self.testcase_suite, rerun_ids)
108
109     def get_testcase_names(self, test_id):
110         # could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
111         setup_teardown_match = re.match(
112             r"((tearDownClass)|(setUpClass)) \((.+\..+)\)", test_id
113         )
114         if setup_teardown_match:
115             test_name, _, _, testcase_name = setup_teardown_match.groups()
116             if len(testcase_name.split(".")) == 2:
117                 for key in self.testcases_by_id.keys():
118                     if key.startswith(testcase_name):
119                         testcase_name = key
120                         break
121             testcase_name = self._get_testcase_doc_name(testcase_name)
122         else:
123             test_name = self._get_test_description(test_id)
124             testcase_name = self._get_testcase_doc_name(test_id)
125
126         return testcase_name, test_name
127
128     def _get_test_description(self, test_id):
129         if test_id in self.testcases_by_id:
130             desc = get_test_description(descriptions, self.testcases_by_id[test_id])
131         else:
132             desc = test_id
133         return desc
134
135     def _get_testcase_doc_name(self, test_id):
136         if test_id in self.testcases_by_id:
137             doc_name = get_testcase_doc_name(self.testcases_by_id[test_id])
138         else:
139             doc_name = test_id
140         return doc_name
141
142
143 def test_runner_wrapper(
144     suite, keep_alive_pipe, stdouterr_queue, finished_pipe, result_pipe, logger
145 ):
146     sys.stdout = stdouterr_queue
147     sys.stderr = stdouterr_queue
148     VppTestCase.parallel_handler = logger.handlers[0]
149     result = VppTestRunner(
150         keep_alive_pipe=keep_alive_pipe,
151         descriptions=descriptions,
152         verbosity=config.verbose,
153         result_pipe=result_pipe,
154         failfast=config.failfast,
155         print_summary=False,
156     ).run(suite)
157     finished_pipe.send(result.wasSuccessful())
158     finished_pipe.close()
159     keep_alive_pipe.close()
160
161
162 class TestCaseWrapper(object):
163     def __init__(self, testcase_suite, manager):
164         self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(duplex=False)
165         self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
166         self.result_parent_end, self.result_child_end = Pipe(duplex=False)
167         self.testcase_suite = testcase_suite
168         self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
169         self.logger = get_parallel_logger(self.stdouterr_queue)
170         self.child = Process(
171             target=test_runner_wrapper,
172             args=(
173                 testcase_suite,
174                 self.keep_alive_child_end,
175                 self.stdouterr_queue,
176                 self.finished_child_end,
177                 self.result_child_end,
178                 self.logger,
179             ),
180         )
181         self.child.start()
182         self.last_test_temp_dir = None
183         self.last_test_vpp_binary = None
184         self._last_test = None
185         self.last_test_id = None
186         self.vpp_pid = None
187         self.last_heard = time.time()
188         self.core_detected_at = None
189         self.testcases_by_id = {}
190         self.testclasess_with_core = {}
191         for testcase in self.testcase_suite:
192             self.testcases_by_id[testcase.id()] = testcase
193         self.result = TestResult(testcase_suite, self.testcases_by_id)
194
195     @property
196     def last_test(self):
197         return self._last_test
198
199     @last_test.setter
200     def last_test(self, test_id):
201         self.last_test_id = test_id
202         if test_id in self.testcases_by_id:
203             testcase = self.testcases_by_id[test_id]
204             self._last_test = testcase.shortDescription()
205             if not self._last_test:
206                 self._last_test = str(testcase)
207         else:
208             self._last_test = test_id
209
210     def add_testclass_with_core(self):
211         if self.last_test_id in self.testcases_by_id:
212             test = self.testcases_by_id[self.last_test_id]
213             class_name = unittest.util.strclass(test.__class__)
214             test_name = "'{}' ({})".format(
215                 get_test_description(descriptions, test), self.last_test_id
216             )
217         else:
218             test_name = self.last_test_id
219             class_name = re.match(
220                 r"((tearDownClass)|(setUpClass)) " r"\((.+\..+)\)", test_name
221             ).groups()[3]
222         if class_name not in self.testclasess_with_core:
223             self.testclasess_with_core[class_name] = (
224                 test_name,
225                 self.last_test_vpp_binary,
226                 self.last_test_temp_dir,
227             )
228
229     def close_pipes(self):
230         self.keep_alive_child_end.close()
231         self.finished_child_end.close()
232         self.result_child_end.close()
233         self.keep_alive_parent_end.close()
234         self.finished_parent_end.close()
235         self.result_parent_end.close()
236
237     def was_successful(self):
238         return self.result.was_successful()
239
240     @property
241     def cpus_used(self):
242         return self.testcase_suite.cpus_used
243
244     def get_assigned_cpus(self):
245         return self.testcase_suite.get_assigned_cpus()
246
247
248 def stdouterr_reader_wrapper(
249     unread_testcases, finished_unread_testcases, read_testcases
250 ):
251     read_testcase = None
252     while read_testcases.is_set() or unread_testcases:
253         if finished_unread_testcases:
254             read_testcase = finished_unread_testcases.pop()
255             unread_testcases.remove(read_testcase)
256         elif unread_testcases:
257             read_testcase = unread_testcases.pop()
258         if read_testcase:
259             data = ""
260             while data is not None:
261                 sys.stdout.write(data)
262                 data = read_testcase.stdouterr_queue.get()
263
264             read_testcase.stdouterr_queue.close()
265             finished_unread_testcases.discard(read_testcase)
266             read_testcase = None
267
268
269 def handle_failed_suite(logger, last_test_temp_dir, vpp_pid, vpp_binary):
270     if last_test_temp_dir:
271         # Need to create link in case of a timeout or core dump without failure
272         lttd = os.path.basename(last_test_temp_dir)
273         link_path = os.path.join(config.failed_dir, f"{lttd}-FAILED")
274         if not os.path.exists(link_path):
275             os.symlink(last_test_temp_dir, link_path)
276         logger.error(
277             "Symlink to failed testcase directory: %s -> %s" % (link_path, lttd)
278         )
279
280         # Report core existence
281         core_path = get_core_path(last_test_temp_dir)
282         if os.path.exists(core_path):
283             logger.error(
284                 "Core-file exists in test temporary directory: %s!" % core_path
285             )
286             check_core_path(logger, core_path)
287             logger.debug("Running 'file %s':" % core_path)
288             try:
289                 info = check_output(["file", core_path])
290                 logger.debug(info)
291             except CalledProcessError as e:
292                 logger.error(
293                     "Subprocess returned with return code "
294                     "while running `file' utility on core-file "
295                     "returned: "
296                     "rc=%s",
297                     e.returncode,
298                 )
299             except OSError as e:
300                 logger.error(
301                     "Subprocess returned with OS error while "
302                     "running 'file' utility "
303                     "on core-file: "
304                     "(%s) %s",
305                     e.errno,
306                     e.strerror,
307                 )
308             except Exception as e:
309                 logger.exception("Unexpected error running `file' utility on core-file")
310             logger.error(f"gdb {vpp_binary} {core_path}")
311
312     if vpp_pid:
313         # Copy api post mortem
314         api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
315         if os.path.isfile(api_post_mortem_path):
316             logger.error(
317                 "Copying api_post_mortem.%d to %s" % (vpp_pid, last_test_temp_dir)
318             )
319             shutil.copy2(api_post_mortem_path, last_test_temp_dir)
320
321
322 def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
323     if is_core_present(tempdir):
324         if debug_core:
325             print(
326                 "VPP core detected in %s. Last test running was %s"
327                 % (tempdir, core_crash_test)
328             )
329             print(single_line_delim)
330             spawn_gdb(vpp_binary, get_core_path(tempdir))
331             print(single_line_delim)
332         elif config.compress_core:
333             print("Compressing core-file in test directory `%s'" % tempdir)
334             os.system("gzip %s" % get_core_path(tempdir))
335
336
337 def handle_cores(failed_testcases):
338     for failed_testcase in failed_testcases:
339         tcs_with_core = failed_testcase.testclasess_with_core
340         if tcs_with_core:
341             for test, vpp_binary, tempdir in tcs_with_core.values():
342                 check_and_handle_core(vpp_binary, tempdir, test)
343
344
345 def process_finished_testsuite(
346     wrapped_testcase_suite, finished_testcase_suites, failed_wrapped_testcases, results
347 ):
348     results.append(wrapped_testcase_suite.result)
349     finished_testcase_suites.add(wrapped_testcase_suite)
350     stop_run = False
351     if config.failfast and not wrapped_testcase_suite.was_successful():
352         stop_run = True
353
354     if not wrapped_testcase_suite.was_successful():
355         failed_wrapped_testcases.add(wrapped_testcase_suite)
356         handle_failed_suite(
357             wrapped_testcase_suite.logger,
358             wrapped_testcase_suite.last_test_temp_dir,
359             wrapped_testcase_suite.vpp_pid,
360             wrapped_testcase_suite.last_test_vpp_binary,
361         )
362
363     return stop_run
364
365
366 def run_forked(testcase_suites):
367     wrapped_testcase_suites = set()
368     solo_testcase_suites = []
369
370     # suites are unhashable, need to use list
371     results = []
372     unread_testcases = set()
373     finished_unread_testcases = set()
374     manager = StreamQueueManager()
375     manager.start()
376     tests_running = 0
377     free_cpus = list(available_cpus)
378
379     def on_suite_start(tc):
380         nonlocal tests_running
381         nonlocal free_cpus
382         tests_running = tests_running + 1
383
384     def on_suite_finish(tc):
385         nonlocal tests_running
386         nonlocal free_cpus
387         tests_running = tests_running - 1
388         assert tests_running >= 0
389         free_cpus.extend(tc.get_assigned_cpus())
390
391     def run_suite(suite):
392         nonlocal manager
393         nonlocal wrapped_testcase_suites
394         nonlocal unread_testcases
395         nonlocal free_cpus
396         suite.assign_cpus(free_cpus[: suite.cpus_used])
397         free_cpus = free_cpus[suite.cpus_used :]
398         wrapper = TestCaseWrapper(suite, manager)
399         wrapped_testcase_suites.add(wrapper)
400         unread_testcases.add(wrapper)
401         on_suite_start(suite)
402
403     def can_run_suite(suite):
404         return tests_running < max_concurrent_tests and (
405             suite.cpus_used <= len(free_cpus) or suite.cpus_used > max_vpp_cpus
406         )
407
408     while free_cpus and testcase_suites:
409         a_suite = testcase_suites[0]
410         if a_suite.is_tagged_run_solo:
411             a_suite = testcase_suites.pop(0)
412             solo_testcase_suites.append(a_suite)
413             continue
414         if can_run_suite(a_suite):
415             a_suite = testcase_suites.pop(0)
416             run_suite(a_suite)
417         else:
418             break
419
420     if tests_running == 0 and solo_testcase_suites:
421         a_suite = solo_testcase_suites.pop(0)
422         run_suite(a_suite)
423
424     read_from_testcases = threading.Event()
425     read_from_testcases.set()
426     stdouterr_thread = threading.Thread(
427         target=stdouterr_reader_wrapper,
428         args=(unread_testcases, finished_unread_testcases, read_from_testcases),
429     )
430     stdouterr_thread.start()
431
432     failed_wrapped_testcases = set()
433     stop_run = False
434
435     try:
436         while wrapped_testcase_suites or testcase_suites:
437             finished_testcase_suites = set()
438             for wrapped_testcase_suite in wrapped_testcase_suites:
439                 while wrapped_testcase_suite.result_parent_end.poll():
440                     wrapped_testcase_suite.result.process_result(
441                         *wrapped_testcase_suite.result_parent_end.recv()
442                     )
443                     wrapped_testcase_suite.last_heard = time.time()
444
445                 while wrapped_testcase_suite.keep_alive_parent_end.poll():
446                     (
447                         wrapped_testcase_suite.last_test,
448                         wrapped_testcase_suite.last_test_vpp_binary,
449                         wrapped_testcase_suite.last_test_temp_dir,
450                         wrapped_testcase_suite.vpp_pid,
451                     ) = wrapped_testcase_suite.keep_alive_parent_end.recv()
452                     wrapped_testcase_suite.last_heard = time.time()
453
454                 if wrapped_testcase_suite.finished_parent_end.poll():
455                     wrapped_testcase_suite.finished_parent_end.recv()
456                     wrapped_testcase_suite.last_heard = time.time()
457                     stop_run = (
458                         process_finished_testsuite(
459                             wrapped_testcase_suite,
460                             finished_testcase_suites,
461                             failed_wrapped_testcases,
462                             results,
463                         )
464                         or stop_run
465                     )
466                     continue
467
468                 fail = False
469                 if wrapped_testcase_suite.last_heard + config.timeout < time.time():
470                     fail = True
471                     wrapped_testcase_suite.logger.critical(
472                         "Child test runner process timed out "
473                         "(last test running was `%s' in `%s')!"
474                         % (
475                             wrapped_testcase_suite.last_test,
476                             wrapped_testcase_suite.last_test_temp_dir,
477                         )
478                     )
479                 elif not wrapped_testcase_suite.child.is_alive():
480                     fail = True
481                     wrapped_testcase_suite.logger.critical(
482                         "Child test runner process unexpectedly died "
483                         "(last test running was `%s' in `%s')!"
484                         % (
485                             wrapped_testcase_suite.last_test,
486                             wrapped_testcase_suite.last_test_temp_dir,
487                         )
488                     )
489                 elif (
490                     wrapped_testcase_suite.last_test_temp_dir
491                     and wrapped_testcase_suite.last_test_vpp_binary
492                 ):
493                     if is_core_present(wrapped_testcase_suite.last_test_temp_dir):
494                         wrapped_testcase_suite.add_testclass_with_core()
495                         if wrapped_testcase_suite.core_detected_at is None:
496                             wrapped_testcase_suite.core_detected_at = time.time()
497                         elif (
498                             wrapped_testcase_suite.core_detected_at + core_timeout
499                             < time.time()
500                         ):
501                             wrapped_testcase_suite.logger.critical(
502                                 "Child test runner process unresponsive and "
503                                 "core-file exists in test temporary directory "
504                                 "(last test running was `%s' in `%s')!"
505                                 % (
506                                     wrapped_testcase_suite.last_test,
507                                     wrapped_testcase_suite.last_test_temp_dir,
508                                 )
509                             )
510                             fail = True
511
512                 if fail:
513                     wrapped_testcase_suite.child.terminate()
514                     try:
515                         # terminating the child process tends to leave orphan
516                         # VPP process around
517                         if wrapped_testcase_suite.vpp_pid:
518                             os.kill(wrapped_testcase_suite.vpp_pid, signal.SIGTERM)
519                     except OSError:
520                         # already dead
521                         pass
522                     wrapped_testcase_suite.result.crashed = True
523                     wrapped_testcase_suite.result.process_result(
524                         wrapped_testcase_suite.last_test_id, TestResultCode.ERROR
525                     )
526                     stop_run = (
527                         process_finished_testsuite(
528                             wrapped_testcase_suite,
529                             finished_testcase_suites,
530                             failed_wrapped_testcases,
531                             results,
532                         )
533                         or stop_run
534                     )
535
536             for finished_testcase in finished_testcase_suites:
537                 # Somewhat surprisingly, the join below may
538                 # timeout, even if client signaled that
539                 # it finished - so we note it just in case.
540                 join_start = time.time()
541                 finished_testcase.child.join(test_finished_join_timeout)
542                 join_end = time.time()
543                 if join_end - join_start >= test_finished_join_timeout:
544                     finished_testcase.logger.error(
545                         "Timeout joining finished test: %s (pid %d)"
546                         % (finished_testcase.last_test, finished_testcase.child.pid)
547                     )
548                 finished_testcase.close_pipes()
549                 wrapped_testcase_suites.remove(finished_testcase)
550                 finished_unread_testcases.add(finished_testcase)
551                 finished_testcase.stdouterr_queue.put(None)
552                 on_suite_finish(finished_testcase)
553                 if stop_run:
554                     while testcase_suites:
555                         results.append(TestResult(testcase_suites.pop(0)))
556                 elif testcase_suites:
557                     a_suite = testcase_suites[0]
558                     while a_suite and a_suite.is_tagged_run_solo:
559                         testcase_suites.pop(0)
560                         solo_testcase_suites.append(a_suite)
561                         if testcase_suites:
562                             a_suite = testcase_suites[0]
563                         else:
564                             a_suite = None
565                     if a_suite and can_run_suite(a_suite):
566                         testcase_suites.pop(0)
567                         run_suite(a_suite)
568                 if solo_testcase_suites and tests_running == 0:
569                     a_suite = solo_testcase_suites.pop(0)
570                     run_suite(a_suite)
571             time.sleep(0.1)
572     except Exception:
573         for wrapped_testcase_suite in wrapped_testcase_suites:
574             wrapped_testcase_suite.child.terminate()
575             wrapped_testcase_suite.stdouterr_queue.put(None)
576         raise
577     finally:
578         read_from_testcases.clear()
579         stdouterr_thread.join(config.timeout)
580         manager.shutdown()
581
582     handle_cores(failed_wrapped_testcases)
583     return results
584
585
586 class TestSuiteWrapper(unittest.TestSuite):
587     cpus_used = 0
588
589     def __init__(self):
590         return super().__init__()
591
592     def addTest(self, test):
593         self.cpus_used = max(self.cpus_used, test.get_cpus_required())
594         super().addTest(test)
595
596     def assign_cpus(self, cpus):
597         self.cpus = cpus
598
599     def _handleClassSetUp(self, test, result):
600         if not test.__class__.skipped_due_to_cpu_lack:
601             test.assign_cpus(self.cpus)
602         super()._handleClassSetUp(test, result)
603
604     def get_assigned_cpus(self):
605         return self.cpus
606
607
608 class SplitToSuitesCallback:
609     def __init__(self, filter_callback):
610         self.suites = {}
611         self.suite_name = "default"
612         self.filter_callback = filter_callback
613         self.filtered = TestSuiteWrapper()
614
615     def __call__(self, file_name, cls, method):
616         test_method = cls(method)
617         if self.filter_callback(file_name, cls.__name__, method):
618             self.suite_name = file_name + cls.__name__
619             if self.suite_name not in self.suites:
620                 self.suites[self.suite_name] = TestSuiteWrapper()
621                 self.suites[self.suite_name].is_tagged_run_solo = False
622             self.suites[self.suite_name].addTest(test_method)
623             if test_method.is_tagged_run_solo():
624                 self.suites[self.suite_name].is_tagged_run_solo = True
625
626         else:
627             self.filtered.addTest(test_method)
628
629
630 def parse_test_filter(test_filter):
631     f = test_filter
632     filter_file_name = None
633     filter_class_name = None
634     filter_func_name = None
635     if f:
636         if "." in f:
637             parts = f.split(".")
638             if len(parts) > 3:
639                 raise Exception(f"Invalid test filter: {test_filter}")
640             if len(parts) > 2:
641                 if parts[2] not in ("*", ""):
642                     filter_func_name = parts[2]
643             if parts[1] not in ("*", ""):
644                 filter_class_name = parts[1]
645             if parts[0] not in ("*", ""):
646                 if parts[0].startswith("test_"):
647                     filter_file_name = parts[0]
648                 else:
649                     filter_file_name = "test_%s" % parts[0]
650         else:
651             if f.startswith("test_"):
652                 filter_file_name = f
653             else:
654                 filter_file_name = "test_%s" % f
655     if filter_file_name:
656         filter_file_name = "%s.py" % filter_file_name
657     return filter_file_name, filter_class_name, filter_func_name
658
659
660 def filter_tests(tests, filter_cb):
661     result = TestSuiteWrapper()
662     for t in tests:
663         if isinstance(t, unittest.suite.TestSuite):
664             # this is a bunch of tests, recursively filter...
665             x = filter_tests(t, filter_cb)
666             if x.countTestCases() > 0:
667                 result.addTest(x)
668         elif isinstance(t, unittest.TestCase):
669             # this is a single test
670             parts = t.id().split(".")
671             # t.id() for common cases like this:
672             # test_classifier.TestClassifier.test_acl_ip
673             # apply filtering only if it is so
674             if len(parts) == 3:
675                 if not filter_cb(parts[0], parts[1], parts[2]):
676                     continue
677             result.addTest(t)
678         else:
679             # unexpected object, don't touch it
680             result.addTest(t)
681     return result
682
683
684 class FilterByTestOption:
685     def __init__(self, filters):
686         self.filters = filters
687
688     def __call__(self, file_name, class_name, func_name):
689         def test_one(
690             filter_file_name,
691             filter_class_name,
692             filter_func_name,
693             file_name,
694             class_name,
695             func_name,
696         ):
697             if filter_file_name:
698                 fn_match = fnmatch.fnmatch(file_name, filter_file_name)
699                 if not fn_match:
700                     return False
701             if filter_class_name and class_name != filter_class_name:
702                 return False
703             if filter_func_name and func_name != filter_func_name:
704                 return False
705             return True
706
707         for filter_file_name, filter_class_name, filter_func_name in self.filters:
708             if test_one(
709                 filter_file_name,
710                 filter_class_name,
711                 filter_func_name,
712                 file_name,
713                 class_name,
714                 func_name,
715             ):
716                 return True
717
718         return False
719
720
721 class FilterByClassList:
722     def __init__(self, classes_with_filenames):
723         self.classes_with_filenames = classes_with_filenames
724
725     def __call__(self, file_name, class_name, func_name):
726         return ".".join([file_name, class_name]) in self.classes_with_filenames
727
728
729 def suite_from_failed(suite, failed):
730     failed = {x.rsplit(".", 1)[0] for x in failed}
731     filter_cb = FilterByClassList(failed)
732     suite = filter_tests(suite, filter_cb)
733     return suite
734
735
736 class AllResults(dict):
737     def __init__(self):
738         super(AllResults, self).__init__()
739         self.all_testcases = 0
740         self.results_per_suite = []
741         for trc in list(TestResultCode):
742             self[trc] = 0
743         self.rerun = []
744         self.testsuites_no_tests_run = []
745
746     def add_results(self, result):
747         self.results_per_suite.append(result)
748         for trc in list(TestResultCode):
749             self[trc] += len(result[trc])
750
751     def add_result(self, result):
752         retval = 0
753         self.all_testcases += result.testcase_suite.countTestCases()
754         self.add_results(result)
755
756         if result.no_tests_run():
757             self.testsuites_no_tests_run.append(result.testcase_suite)
758             if result.crashed:
759                 retval = -1
760             else:
761                 retval = 1
762         elif not result.was_successful():
763             retval = 1
764
765         if retval != 0:
766             self.rerun.append(result.testcase_suite)
767
768         return retval
769
770     def print_results(self):
771         print("")
772         print(double_line_delim)
773         print("TEST RESULTS:")
774
775         def indent_results(lines):
776             lines = list(filter(None, lines))
777             maximum = max(lines, key=lambda x: x.index(":"))
778             maximum = 4 + maximum.index(":")
779             for l in lines:
780                 padding = " " * (maximum - l.index(":"))
781                 print(f"{padding}{l}")
782
783         indent_results(
784             [
785                 f"Scheduled tests: {self.all_testcases}",
786                 f"Executed tests: {self[TestResultCode.TEST_RUN]}",
787                 f"Passed tests: {colorize(self[TestResultCode.PASS], GREEN)}",
788                 f"Expected failures: {colorize(self[TestResultCode.EXPECTED_FAIL], GREEN)}"
789                 if self[TestResultCode.EXPECTED_FAIL]
790                 else None,
791                 f"Skipped tests: {colorize(self[TestResultCode.SKIP], YELLOW)}"
792                 if self[TestResultCode.SKIP]
793                 else None,
794                 f"Not Executed tests: {colorize(self.not_executed, RED)}"
795                 if self.not_executed
796                 else None,
797                 f"Failures: {colorize(self[TestResultCode.FAIL], RED)}"
798                 if self[TestResultCode.FAIL]
799                 else None,
800                 f"Unexpected passes: {colorize(self[TestResultCode.UNEXPECTED_PASS], RED)}"
801                 if self[TestResultCode.UNEXPECTED_PASS]
802                 else None,
803                 f"Errors: {colorize(self[TestResultCode.ERROR], RED)}"
804                 if self[TestResultCode.ERROR]
805                 else None,
806                 "Tests skipped due to lack of CPUS: "
807                 f"{colorize(self[TestResultCode.SKIP_CPU_SHORTAGE], YELLOW)}"
808                 if self[TestResultCode.SKIP_CPU_SHORTAGE]
809                 else None,
810             ]
811         )
812
813         if self.all_failed > 0:
814             print("FAILURES AND ERRORS IN TESTS:")
815             for result in self.results_per_suite:
816                 old_testcase_name = None
817                 for tr_code, headline in (
818                     (TestResultCode.FAIL, "FAILURE"),
819                     (TestResultCode.ERROR, "ERROR"),
820                     (TestResultCode.UNEXPECTED_PASS, "UNEXPECTED PASS"),
821                 ):
822                     if not result[tr_code]:
823                         continue
824
825                     for failed_test_id in result[tr_code]:
826                         new_testcase_name, test_name = result.get_testcase_names(
827                             failed_test_id
828                         )
829                         if new_testcase_name != old_testcase_name:
830                             print(
831                                 f"  Testcase name: {colorize(new_testcase_name, RED)}"
832                             )
833                             old_testcase_name = new_testcase_name
834                         print(
835                             f"    {headline}: {colorize(test_name, RED)} [{failed_test_id}]"
836                         )
837
838         if self.testsuites_no_tests_run:
839             print("TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:")
840             tc_classes = set()
841             for testsuite in self.testsuites_no_tests_run:
842                 for testcase in testsuite:
843                     tc_classes.add(get_testcase_doc_name(testcase))
844             for tc_class in tc_classes:
845                 print("  {}".format(colorize(tc_class, RED)))
846
847         if self[TestResultCode.SKIP_CPU_SHORTAGE]:
848             print()
849             print(
850                 colorize(
851                     "     SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT"
852                     " ENOUGH CPUS AVAILABLE",
853                     YELLOW,
854                 )
855             )
856         print(double_line_delim)
857         print("")
858
859     @property
860     def not_executed(self):
861         return self.all_testcases - self[TestResultCode.TEST_RUN]
862
863     @property
864     def all_failed(self):
865         return (
866             self[TestResultCode.FAIL]
867             + self[TestResultCode.ERROR]
868             + self[TestResultCode.UNEXPECTED_PASS]
869         )
870
871
872 def parse_results(results):
873     """
874     Prints the number of scheduled, executed, not executed, passed, failed,
875     errored and skipped tests and details about failed and errored tests.
876
877     Also returns all suites where any test failed.
878
879     :param results:
880     :return:
881     """
882
883     results_per_suite = AllResults()
884     crashed = False
885     failed = False
886     for result in results:
887         result_code = results_per_suite.add_result(result)
888         if result_code == 1:
889             failed = True
890         elif result_code == -1:
891             crashed = True
892
893     results_per_suite.print_results()
894
895     if crashed:
896         return_code = -1
897     elif failed:
898         return_code = 1
899     else:
900         return_code = 0
901     return return_code, results_per_suite.rerun
902
903
904 if __name__ == "__main__":
905     print(f"Config is: {config}")
906
907     if config.sanity:
908         print("Running sanity test case.")
909         try:
910             rc = sanity_run_vpp.main()
911             if rc != 0:
912                 sys.exit(rc)
913         except Exception as e:
914             print(traceback.format_exc())
915             print("Couldn't run sanity test case.")
916             sys.exit(-1)
917
918     test_finished_join_timeout = 15
919
920     debug_gdb = config.debug in ["gdb", "gdbserver", "attach"]
921     debug_core = config.debug == "core"
922
923     run_interactive = debug_gdb or config.step or config.force_foreground
924
925     max_concurrent_tests = 0
926     print(f"OS reports {num_cpus} available cpu(s).")
927
928     test_jobs = config.jobs
929     if test_jobs == "auto":
930         if run_interactive:
931             max_concurrent_tests = 1
932             print("Interactive mode required, running tests consecutively.")
933         else:
934             max_concurrent_tests = num_cpus
935             print(
936                 f"Running at most {max_concurrent_tests} python test "
937                 "processes concurrently."
938             )
939     else:
940         max_concurrent_tests = test_jobs
941         print(
942             f"Running at most {max_concurrent_tests} python test processes "
943             "concurrently as set by 'TEST_JOBS'."
944         )
945
946     print(f"Using at most {max_vpp_cpus} cpus for VPP threads.")
947
948     if run_interactive and max_concurrent_tests > 1:
949         raise NotImplementedError(
950             "Running tests interactively (DEBUG is gdb[server] or ATTACH or "
951             "STEP is set) in parallel (TEST_JOBS is more than 1) is not "
952             "supported"
953         )
954
955     descriptions = True
956
957     print("Running tests using custom test runner.")
958     filters = [(parse_test_filter(f)) for f in config.filter.split(",")]
959
960     print(
961         "Selected filters: ",
962         "|".join(
963             f"file={filter_file}, class={filter_class}, function={filter_func}"
964             for filter_file, filter_class, filter_func in filters
965         ),
966     )
967
968     filter_cb = FilterByTestOption(filters)
969
970     cb = SplitToSuitesCallback(filter_cb)
971     for d in config.test_src_dir:
972         print("Adding tests from directory tree %s" % d)
973         discover_tests(d, cb)
974
975     # suites are not hashable, need to use list
976     suites = []
977     tests_amount = 0
978     for testcase_suite in cb.suites.values():
979         tests_amount += testcase_suite.countTestCases()
980         if testcase_suite.cpus_used > max_vpp_cpus:
981             # here we replace test functions with lambdas to just skip them
982             # but we also replace setUp/tearDown functions to do nothing
983             # so that the test can be "started" and "stopped", so that we can
984             # still keep those prints (test description - SKIP), which are done
985             # in stopTest() (for that to trigger, test function must run)
986             for t in testcase_suite:
987                 for m in dir(t):
988                     if m.startswith("test_"):
989                         setattr(t, m, lambda: t.skipTest("not enough cpus"))
990                 setattr(t.__class__, "setUpClass", lambda: None)
991                 setattr(t.__class__, "tearDownClass", lambda: None)
992                 setattr(t, "setUp", lambda: None)
993                 setattr(t, "tearDown", lambda: None)
994                 t.__class__.skipped_due_to_cpu_lack = True
995         suites.append(testcase_suite)
996
997     print(
998         "%s out of %s tests match specified filters"
999         % (tests_amount, tests_amount + cb.filtered.countTestCases())
1000     )
1001
1002     if not config.extended:
1003         print("Not running extended tests (some tests will be skipped)")
1004
1005     attempts = config.retries + 1
1006     if attempts > 1:
1007         print("Perform %s attempts to pass the suite..." % attempts)
1008
1009     if run_interactive and suites:
1010         # don't fork if requiring interactive terminal
1011         print("Running tests in foreground in the current process")
1012         full_suite = unittest.TestSuite()
1013         free_cpus = list(available_cpus)
1014         cpu_shortage = False
1015         for suite in suites:
1016             if suite.cpus_used <= max_vpp_cpus:
1017                 suite.assign_cpus(free_cpus[: suite.cpus_used])
1018             else:
1019                 suite.assign_cpus([])
1020                 cpu_shortage = True
1021         full_suite.addTests(suites)
1022         result = VppTestRunner(
1023             verbosity=config.verbose, failfast=config.failfast, print_summary=True
1024         ).run(full_suite)
1025         was_successful = result.wasSuccessful()
1026         if not was_successful:
1027             for test_case_info in result.failed_test_cases_info:
1028                 handle_failed_suite(
1029                     test_case_info.logger,
1030                     test_case_info.tempdir,
1031                     test_case_info.vpp_pid,
1032                     config.vpp,
1033                 )
1034                 if test_case_info in result.core_crash_test_cases_info:
1035                     check_and_handle_core(
1036                         test_case_info.vpp_bin_path,
1037                         test_case_info.tempdir,
1038                         test_case_info.core_crash_test,
1039                     )
1040
1041         if cpu_shortage:
1042             print()
1043             print(
1044                 colorize(
1045                     "SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT"
1046                     " ENOUGH CPUS AVAILABLE",
1047                     YELLOW,
1048                 )
1049             )
1050             print()
1051         sys.exit(not was_successful)
1052     else:
1053         print(
1054             "Running each VPPTestCase in a separate background process"
1055             f" with at most {max_concurrent_tests} parallel python test "
1056             "process(es)"
1057         )
1058         exit_code = 0
1059         while suites and attempts > 0:
1060             results = run_forked(suites)
1061             exit_code, suites = parse_results(results)
1062             attempts -= 1
1063             if exit_code == 0:
1064                 print("Test run was successful")
1065             else:
1066                 print("%s attempt(s) left." % attempts)
1067         sys.exit(exit_code)