tests: fix default failed dir setting
[vpp.git] / test / run_tests.py
1 #!/usr/bin/env python3
2
3 import sys
4 import shutil
5 import os
6 import fnmatch
7 import unittest
8 import time
9 import threading
10 import traceback
11 import signal
12 import re
13 from multiprocessing import Process, Pipe, get_context
14 from multiprocessing.queues import Queue
15 from multiprocessing.managers import BaseManager
16 from config import config, num_cpus, available_cpus, max_vpp_cpus
17 from framework import (
18     VppTestRunner,
19     VppTestCase,
20     get_testcase_doc_name,
21     get_test_description,
22     PASS,
23     FAIL,
24     ERROR,
25     SKIP,
26     TEST_RUN,
27     SKIP_CPU_SHORTAGE,
28 )
29 from debug import spawn_gdb
30 from log import (
31     get_parallel_logger,
32     double_line_delim,
33     RED,
34     YELLOW,
35     GREEN,
36     colorize,
37     single_line_delim,
38 )
39 from discover_tests import discover_tests
40 import sanity_run_vpp
41 from subprocess import check_output, CalledProcessError
42 from util import check_core_path, get_core_path, is_core_present
43
44 # timeout which controls how long the child has to finish after seeing
45 # a core dump in test temporary directory. If this is exceeded, parent assumes
46 # that child process is stuck (e.g. waiting for event from vpp) and kill
47 # the child
48 core_timeout = 3
49
50
51 class StreamQueue(Queue):
52     def write(self, msg):
53         self.put(msg)
54
55     def flush(self):
56         sys.__stdout__.flush()
57         sys.__stderr__.flush()
58
59     def fileno(self):
60         return self._writer.fileno()
61
62
63 class StreamQueueManager(BaseManager):
64     pass
65
66
67 StreamQueueManager.register("StreamQueue", StreamQueue)
68
69
70 class TestResult(dict):
71     def __init__(self, testcase_suite, testcases_by_id=None):
72         super(TestResult, self).__init__()
73         self[PASS] = []
74         self[FAIL] = []
75         self[ERROR] = []
76         self[SKIP] = []
77         self[SKIP_CPU_SHORTAGE] = []
78         self[TEST_RUN] = []
79         self.crashed = False
80         self.testcase_suite = testcase_suite
81         self.testcases = [testcase for testcase in testcase_suite]
82         self.testcases_by_id = testcases_by_id
83
84     def was_successful(self):
85         return (
86             0 == len(self[FAIL]) == len(self[ERROR])
87             and len(self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE])
88             == self.testcase_suite.countTestCases()
89         )
90
91     def no_tests_run(self):
92         return 0 == len(self[TEST_RUN])
93
94     def process_result(self, test_id, result):
95         self[result].append(test_id)
96
97     def suite_from_failed(self):
98         rerun_ids = set([])
99         for testcase in self.testcase_suite:
100             tc_id = testcase.id()
101             if tc_id not in self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE]:
102                 rerun_ids.add(tc_id)
103         if rerun_ids:
104             return suite_from_failed(self.testcase_suite, rerun_ids)
105
106     def get_testcase_names(self, test_id):
107         # could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
108         setup_teardown_match = re.match(
109             r"((tearDownClass)|(setUpClass)) \((.+\..+)\)", test_id
110         )
111         if setup_teardown_match:
112             test_name, _, _, testcase_name = setup_teardown_match.groups()
113             if len(testcase_name.split(".")) == 2:
114                 for key in self.testcases_by_id.keys():
115                     if key.startswith(testcase_name):
116                         testcase_name = key
117                         break
118             testcase_name = self._get_testcase_doc_name(testcase_name)
119         else:
120             test_name = self._get_test_description(test_id)
121             testcase_name = self._get_testcase_doc_name(test_id)
122
123         return testcase_name, test_name
124
125     def _get_test_description(self, test_id):
126         if test_id in self.testcases_by_id:
127             desc = get_test_description(descriptions, self.testcases_by_id[test_id])
128         else:
129             desc = test_id
130         return desc
131
132     def _get_testcase_doc_name(self, test_id):
133         if test_id in self.testcases_by_id:
134             doc_name = get_testcase_doc_name(self.testcases_by_id[test_id])
135         else:
136             doc_name = test_id
137         return doc_name
138
139
140 def test_runner_wrapper(
141     suite, keep_alive_pipe, stdouterr_queue, finished_pipe, result_pipe, logger
142 ):
143     sys.stdout = stdouterr_queue
144     sys.stderr = stdouterr_queue
145     VppTestCase.parallel_handler = logger.handlers[0]
146     result = VppTestRunner(
147         keep_alive_pipe=keep_alive_pipe,
148         descriptions=descriptions,
149         verbosity=config.verbose,
150         result_pipe=result_pipe,
151         failfast=config.failfast,
152         print_summary=False,
153     ).run(suite)
154     finished_pipe.send(result.wasSuccessful())
155     finished_pipe.close()
156     keep_alive_pipe.close()
157
158
159 class TestCaseWrapper(object):
160     def __init__(self, testcase_suite, manager):
161         self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(duplex=False)
162         self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
163         self.result_parent_end, self.result_child_end = Pipe(duplex=False)
164         self.testcase_suite = testcase_suite
165         self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
166         self.logger = get_parallel_logger(self.stdouterr_queue)
167         self.child = Process(
168             target=test_runner_wrapper,
169             args=(
170                 testcase_suite,
171                 self.keep_alive_child_end,
172                 self.stdouterr_queue,
173                 self.finished_child_end,
174                 self.result_child_end,
175                 self.logger,
176             ),
177         )
178         self.child.start()
179         self.last_test_temp_dir = None
180         self.last_test_vpp_binary = None
181         self._last_test = None
182         self.last_test_id = None
183         self.vpp_pid = None
184         self.last_heard = time.time()
185         self.core_detected_at = None
186         self.testcases_by_id = {}
187         self.testclasess_with_core = {}
188         for testcase in self.testcase_suite:
189             self.testcases_by_id[testcase.id()] = testcase
190         self.result = TestResult(testcase_suite, self.testcases_by_id)
191
192     @property
193     def last_test(self):
194         return self._last_test
195
196     @last_test.setter
197     def last_test(self, test_id):
198         self.last_test_id = test_id
199         if test_id in self.testcases_by_id:
200             testcase = self.testcases_by_id[test_id]
201             self._last_test = testcase.shortDescription()
202             if not self._last_test:
203                 self._last_test = str(testcase)
204         else:
205             self._last_test = test_id
206
207     def add_testclass_with_core(self):
208         if self.last_test_id in self.testcases_by_id:
209             test = self.testcases_by_id[self.last_test_id]
210             class_name = unittest.util.strclass(test.__class__)
211             test_name = "'{}' ({})".format(
212                 get_test_description(descriptions, test), self.last_test_id
213             )
214         else:
215             test_name = self.last_test_id
216             class_name = re.match(
217                 r"((tearDownClass)|(setUpClass)) " r"\((.+\..+)\)", test_name
218             ).groups()[3]
219         if class_name not in self.testclasess_with_core:
220             self.testclasess_with_core[class_name] = (
221                 test_name,
222                 self.last_test_vpp_binary,
223                 self.last_test_temp_dir,
224             )
225
226     def close_pipes(self):
227         self.keep_alive_child_end.close()
228         self.finished_child_end.close()
229         self.result_child_end.close()
230         self.keep_alive_parent_end.close()
231         self.finished_parent_end.close()
232         self.result_parent_end.close()
233
234     def was_successful(self):
235         return self.result.was_successful()
236
237     @property
238     def cpus_used(self):
239         return self.testcase_suite.cpus_used
240
241     def get_assigned_cpus(self):
242         return self.testcase_suite.get_assigned_cpus()
243
244
245 def stdouterr_reader_wrapper(
246     unread_testcases, finished_unread_testcases, read_testcases
247 ):
248     read_testcase = None
249     while read_testcases.is_set() or unread_testcases:
250         if finished_unread_testcases:
251             read_testcase = finished_unread_testcases.pop()
252             unread_testcases.remove(read_testcase)
253         elif unread_testcases:
254             read_testcase = unread_testcases.pop()
255         if read_testcase:
256             data = ""
257             while data is not None:
258                 sys.stdout.write(data)
259                 data = read_testcase.stdouterr_queue.get()
260
261             read_testcase.stdouterr_queue.close()
262             finished_unread_testcases.discard(read_testcase)
263             read_testcase = None
264
265
266 def handle_failed_suite(logger, last_test_temp_dir, vpp_pid, vpp_binary):
267     if last_test_temp_dir:
268         # Need to create link in case of a timeout or core dump without failure
269         lttd = os.path.basename(last_test_temp_dir)
270         link_path = os.path.join(config.failed_dir, f"{lttd}-FAILED")
271         if not os.path.exists(link_path):
272             os.symlink(last_test_temp_dir, link_path)
273         logger.error(
274             "Symlink to failed testcase directory: %s -> %s" % (link_path, lttd)
275         )
276
277         # Report core existence
278         core_path = get_core_path(last_test_temp_dir)
279         if os.path.exists(core_path):
280             logger.error(
281                 "Core-file exists in test temporary directory: %s!" % core_path
282             )
283             check_core_path(logger, core_path)
284             logger.debug("Running 'file %s':" % core_path)
285             try:
286                 info = check_output(["file", core_path])
287                 logger.debug(info)
288             except CalledProcessError as e:
289                 logger.error(
290                     "Subprocess returned with return code "
291                     "while running `file' utility on core-file "
292                     "returned: "
293                     "rc=%s",
294                     e.returncode,
295                 )
296             except OSError as e:
297                 logger.error(
298                     "Subprocess returned with OS error while "
299                     "running 'file' utility "
300                     "on core-file: "
301                     "(%s) %s",
302                     e.errno,
303                     e.strerror,
304                 )
305             except Exception as e:
306                 logger.exception("Unexpected error running `file' utility on core-file")
307             logger.error(f"gdb {vpp_binary} {core_path}")
308
309     if vpp_pid:
310         # Copy api post mortem
311         api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
312         if os.path.isfile(api_post_mortem_path):
313             logger.error(
314                 "Copying api_post_mortem.%d to %s" % (vpp_pid, last_test_temp_dir)
315             )
316             shutil.copy2(api_post_mortem_path, last_test_temp_dir)
317
318
319 def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
320     if is_core_present(tempdir):
321         if debug_core:
322             print(
323                 "VPP core detected in %s. Last test running was %s"
324                 % (tempdir, core_crash_test)
325             )
326             print(single_line_delim)
327             spawn_gdb(vpp_binary, get_core_path(tempdir))
328             print(single_line_delim)
329         elif config.compress_core:
330             print("Compressing core-file in test directory `%s'" % tempdir)
331             os.system("gzip %s" % get_core_path(tempdir))
332
333
334 def handle_cores(failed_testcases):
335     for failed_testcase in failed_testcases:
336         tcs_with_core = failed_testcase.testclasess_with_core
337         if tcs_with_core:
338             for test, vpp_binary, tempdir in tcs_with_core.values():
339                 check_and_handle_core(vpp_binary, tempdir, test)
340
341
342 def process_finished_testsuite(
343     wrapped_testcase_suite, finished_testcase_suites, failed_wrapped_testcases, results
344 ):
345     results.append(wrapped_testcase_suite.result)
346     finished_testcase_suites.add(wrapped_testcase_suite)
347     stop_run = False
348     if config.failfast and not wrapped_testcase_suite.was_successful():
349         stop_run = True
350
351     if not wrapped_testcase_suite.was_successful():
352         failed_wrapped_testcases.add(wrapped_testcase_suite)
353         handle_failed_suite(
354             wrapped_testcase_suite.logger,
355             wrapped_testcase_suite.last_test_temp_dir,
356             wrapped_testcase_suite.vpp_pid,
357             wrapped_testcase_suite.last_test_vpp_binary,
358         )
359
360     return stop_run
361
362
363 def run_forked(testcase_suites):
364     wrapped_testcase_suites = set()
365     solo_testcase_suites = []
366
367     # suites are unhashable, need to use list
368     results = []
369     unread_testcases = set()
370     finished_unread_testcases = set()
371     manager = StreamQueueManager()
372     manager.start()
373     tests_running = 0
374     free_cpus = list(available_cpus)
375
376     def on_suite_start(tc):
377         nonlocal tests_running
378         nonlocal free_cpus
379         tests_running = tests_running + 1
380
381     def on_suite_finish(tc):
382         nonlocal tests_running
383         nonlocal free_cpus
384         tests_running = tests_running - 1
385         assert tests_running >= 0
386         free_cpus.extend(tc.get_assigned_cpus())
387
388     def run_suite(suite):
389         nonlocal manager
390         nonlocal wrapped_testcase_suites
391         nonlocal unread_testcases
392         nonlocal free_cpus
393         suite.assign_cpus(free_cpus[: suite.cpus_used])
394         free_cpus = free_cpus[suite.cpus_used :]
395         wrapper = TestCaseWrapper(suite, manager)
396         wrapped_testcase_suites.add(wrapper)
397         unread_testcases.add(wrapper)
398         on_suite_start(suite)
399
400     def can_run_suite(suite):
401         return tests_running < max_concurrent_tests and (
402             suite.cpus_used <= len(free_cpus) or suite.cpus_used > max_vpp_cpus
403         )
404
405     while free_cpus and testcase_suites:
406         a_suite = testcase_suites[0]
407         if a_suite.is_tagged_run_solo:
408             a_suite = testcase_suites.pop(0)
409             solo_testcase_suites.append(a_suite)
410             continue
411         if can_run_suite(a_suite):
412             a_suite = testcase_suites.pop(0)
413             run_suite(a_suite)
414         else:
415             break
416
417     if tests_running == 0 and solo_testcase_suites:
418         a_suite = solo_testcase_suites.pop(0)
419         run_suite(a_suite)
420
421     read_from_testcases = threading.Event()
422     read_from_testcases.set()
423     stdouterr_thread = threading.Thread(
424         target=stdouterr_reader_wrapper,
425         args=(unread_testcases, finished_unread_testcases, read_from_testcases),
426     )
427     stdouterr_thread.start()
428
429     failed_wrapped_testcases = set()
430     stop_run = False
431
432     try:
433         while wrapped_testcase_suites:
434             finished_testcase_suites = set()
435             for wrapped_testcase_suite in wrapped_testcase_suites:
436                 while wrapped_testcase_suite.result_parent_end.poll():
437                     wrapped_testcase_suite.result.process_result(
438                         *wrapped_testcase_suite.result_parent_end.recv()
439                     )
440                     wrapped_testcase_suite.last_heard = time.time()
441
442                 while wrapped_testcase_suite.keep_alive_parent_end.poll():
443                     (
444                         wrapped_testcase_suite.last_test,
445                         wrapped_testcase_suite.last_test_vpp_binary,
446                         wrapped_testcase_suite.last_test_temp_dir,
447                         wrapped_testcase_suite.vpp_pid,
448                     ) = wrapped_testcase_suite.keep_alive_parent_end.recv()
449                     wrapped_testcase_suite.last_heard = time.time()
450
451                 if wrapped_testcase_suite.finished_parent_end.poll():
452                     wrapped_testcase_suite.finished_parent_end.recv()
453                     wrapped_testcase_suite.last_heard = time.time()
454                     stop_run = (
455                         process_finished_testsuite(
456                             wrapped_testcase_suite,
457                             finished_testcase_suites,
458                             failed_wrapped_testcases,
459                             results,
460                         )
461                         or stop_run
462                     )
463                     continue
464
465                 fail = False
466                 if wrapped_testcase_suite.last_heard + config.timeout < time.time():
467                     fail = True
468                     wrapped_testcase_suite.logger.critical(
469                         "Child test runner process timed out "
470                         "(last test running was `%s' in `%s')!"
471                         % (
472                             wrapped_testcase_suite.last_test,
473                             wrapped_testcase_suite.last_test_temp_dir,
474                         )
475                     )
476                 elif not wrapped_testcase_suite.child.is_alive():
477                     fail = True
478                     wrapped_testcase_suite.logger.critical(
479                         "Child test runner process unexpectedly died "
480                         "(last test running was `%s' in `%s')!"
481                         % (
482                             wrapped_testcase_suite.last_test,
483                             wrapped_testcase_suite.last_test_temp_dir,
484                         )
485                     )
486                 elif (
487                     wrapped_testcase_suite.last_test_temp_dir
488                     and wrapped_testcase_suite.last_test_vpp_binary
489                 ):
490                     if is_core_present(wrapped_testcase_suite.last_test_temp_dir):
491                         wrapped_testcase_suite.add_testclass_with_core()
492                         if wrapped_testcase_suite.core_detected_at is None:
493                             wrapped_testcase_suite.core_detected_at = time.time()
494                         elif (
495                             wrapped_testcase_suite.core_detected_at + core_timeout
496                             < time.time()
497                         ):
498                             wrapped_testcase_suite.logger.critical(
499                                 "Child test runner process unresponsive and "
500                                 "core-file exists in test temporary directory "
501                                 "(last test running was `%s' in `%s')!"
502                                 % (
503                                     wrapped_testcase_suite.last_test,
504                                     wrapped_testcase_suite.last_test_temp_dir,
505                                 )
506                             )
507                             fail = True
508
509                 if fail:
510                     wrapped_testcase_suite.child.terminate()
511                     try:
512                         # terminating the child process tends to leave orphan
513                         # VPP process around
514                         if wrapped_testcase_suite.vpp_pid:
515                             os.kill(wrapped_testcase_suite.vpp_pid, signal.SIGTERM)
516                     except OSError:
517                         # already dead
518                         pass
519                     wrapped_testcase_suite.result.crashed = True
520                     wrapped_testcase_suite.result.process_result(
521                         wrapped_testcase_suite.last_test_id, ERROR
522                     )
523                     stop_run = (
524                         process_finished_testsuite(
525                             wrapped_testcase_suite,
526                             finished_testcase_suites,
527                             failed_wrapped_testcases,
528                             results,
529                         )
530                         or stop_run
531                     )
532
533             for finished_testcase in finished_testcase_suites:
534                 # Somewhat surprisingly, the join below may
535                 # timeout, even if client signaled that
536                 # it finished - so we note it just in case.
537                 join_start = time.time()
538                 finished_testcase.child.join(test_finished_join_timeout)
539                 join_end = time.time()
540                 if join_end - join_start >= test_finished_join_timeout:
541                     finished_testcase.logger.error(
542                         "Timeout joining finished test: %s (pid %d)"
543                         % (finished_testcase.last_test, finished_testcase.child.pid)
544                     )
545                 finished_testcase.close_pipes()
546                 wrapped_testcase_suites.remove(finished_testcase)
547                 finished_unread_testcases.add(finished_testcase)
548                 finished_testcase.stdouterr_queue.put(None)
549                 on_suite_finish(finished_testcase)
550                 if stop_run:
551                     while testcase_suites:
552                         results.append(TestResult(testcase_suites.pop(0)))
553                 elif testcase_suites:
554                     a_suite = testcase_suites.pop(0)
555                     while a_suite and a_suite.is_tagged_run_solo:
556                         solo_testcase_suites.append(a_suite)
557                         if testcase_suites:
558                             a_suite = testcase_suites.pop(0)
559                         else:
560                             a_suite = None
561                     if a_suite and can_run_suite(a_suite):
562                         run_suite(a_suite)
563                 if solo_testcase_suites and tests_running == 0:
564                     a_suite = solo_testcase_suites.pop(0)
565                     run_suite(a_suite)
566             time.sleep(0.1)
567     except Exception:
568         for wrapped_testcase_suite in wrapped_testcase_suites:
569             wrapped_testcase_suite.child.terminate()
570             wrapped_testcase_suite.stdouterr_queue.put(None)
571         raise
572     finally:
573         read_from_testcases.clear()
574         stdouterr_thread.join(config.timeout)
575         manager.shutdown()
576
577     handle_cores(failed_wrapped_testcases)
578     return results
579
580
581 class TestSuiteWrapper(unittest.TestSuite):
582     cpus_used = 0
583
584     def __init__(self):
585         return super().__init__()
586
587     def addTest(self, test):
588         self.cpus_used = max(self.cpus_used, test.get_cpus_required())
589         super().addTest(test)
590
591     def assign_cpus(self, cpus):
592         self.cpus = cpus
593
594     def _handleClassSetUp(self, test, result):
595         if not test.__class__.skipped_due_to_cpu_lack:
596             test.assign_cpus(self.cpus)
597         super()._handleClassSetUp(test, result)
598
599     def get_assigned_cpus(self):
600         return self.cpus
601
602
603 class SplitToSuitesCallback:
604     def __init__(self, filter_callback):
605         self.suites = {}
606         self.suite_name = "default"
607         self.filter_callback = filter_callback
608         self.filtered = TestSuiteWrapper()
609
610     def __call__(self, file_name, cls, method):
611         test_method = cls(method)
612         if self.filter_callback(file_name, cls.__name__, method):
613             self.suite_name = file_name + cls.__name__
614             if self.suite_name not in self.suites:
615                 self.suites[self.suite_name] = TestSuiteWrapper()
616                 self.suites[self.suite_name].is_tagged_run_solo = False
617             self.suites[self.suite_name].addTest(test_method)
618             if test_method.is_tagged_run_solo():
619                 self.suites[self.suite_name].is_tagged_run_solo = True
620
621         else:
622             self.filtered.addTest(test_method)
623
624
625 def parse_test_filter(test_filter):
626     f = test_filter
627     filter_file_name = None
628     filter_class_name = None
629     filter_func_name = None
630     if f:
631         if "." in f:
632             parts = f.split(".")
633             if len(parts) > 3:
634                 raise Exception("Unrecognized %s option: %s" % (test_option, f))
635             if len(parts) > 2:
636                 if parts[2] not in ("*", ""):
637                     filter_func_name = parts[2]
638             if parts[1] not in ("*", ""):
639                 filter_class_name = parts[1]
640             if parts[0] not in ("*", ""):
641                 if parts[0].startswith("test_"):
642                     filter_file_name = parts[0]
643                 else:
644                     filter_file_name = "test_%s" % parts[0]
645         else:
646             if f.startswith("test_"):
647                 filter_file_name = f
648             else:
649                 filter_file_name = "test_%s" % f
650     if filter_file_name:
651         filter_file_name = "%s.py" % filter_file_name
652     return filter_file_name, filter_class_name, filter_func_name
653
654
655 def filter_tests(tests, filter_cb):
656     result = TestSuiteWrapper()
657     for t in tests:
658         if isinstance(t, unittest.suite.TestSuite):
659             # this is a bunch of tests, recursively filter...
660             x = filter_tests(t, filter_cb)
661             if x.countTestCases() > 0:
662                 result.addTest(x)
663         elif isinstance(t, unittest.TestCase):
664             # this is a single test
665             parts = t.id().split(".")
666             # t.id() for common cases like this:
667             # test_classifier.TestClassifier.test_acl_ip
668             # apply filtering only if it is so
669             if len(parts) == 3:
670                 if not filter_cb(parts[0], parts[1], parts[2]):
671                     continue
672             result.addTest(t)
673         else:
674             # unexpected object, don't touch it
675             result.addTest(t)
676     return result
677
678
679 class FilterByTestOption:
680     def __init__(self, filter_file_name, filter_class_name, filter_func_name):
681         self.filter_file_name = filter_file_name
682         self.filter_class_name = filter_class_name
683         self.filter_func_name = filter_func_name
684
685     def __call__(self, file_name, class_name, func_name):
686         if self.filter_file_name:
687             fn_match = fnmatch.fnmatch(file_name, self.filter_file_name)
688             if not fn_match:
689                 return False
690         if self.filter_class_name and class_name != self.filter_class_name:
691             return False
692         if self.filter_func_name and func_name != self.filter_func_name:
693             return False
694         return True
695
696
697 class FilterByClassList:
698     def __init__(self, classes_with_filenames):
699         self.classes_with_filenames = classes_with_filenames
700
701     def __call__(self, file_name, class_name, func_name):
702         return ".".join([file_name, class_name]) in self.classes_with_filenames
703
704
705 def suite_from_failed(suite, failed):
706     failed = {x.rsplit(".", 1)[0] for x in failed}
707     filter_cb = FilterByClassList(failed)
708     suite = filter_tests(suite, filter_cb)
709     return suite
710
711
712 class AllResults(dict):
713     def __init__(self):
714         super(AllResults, self).__init__()
715         self.all_testcases = 0
716         self.results_per_suite = []
717         self[PASS] = 0
718         self[FAIL] = 0
719         self[ERROR] = 0
720         self[SKIP] = 0
721         self[SKIP_CPU_SHORTAGE] = 0
722         self[TEST_RUN] = 0
723         self.rerun = []
724         self.testsuites_no_tests_run = []
725
726     def add_results(self, result):
727         self.results_per_suite.append(result)
728         result_types = [PASS, FAIL, ERROR, SKIP, TEST_RUN, SKIP_CPU_SHORTAGE]
729         for result_type in result_types:
730             self[result_type] += len(result[result_type])
731
732     def add_result(self, result):
733         retval = 0
734         self.all_testcases += result.testcase_suite.countTestCases()
735         self.add_results(result)
736
737         if result.no_tests_run():
738             self.testsuites_no_tests_run.append(result.testcase_suite)
739             if result.crashed:
740                 retval = -1
741             else:
742                 retval = 1
743         elif not result.was_successful():
744             retval = 1
745
746         if retval != 0:
747             self.rerun.append(result.testcase_suite)
748
749         return retval
750
751     def print_results(self):
752         print("")
753         print(double_line_delim)
754         print("TEST RESULTS:")
755
756         def indent_results(lines):
757             lines = list(filter(None, lines))
758             maximum = max(lines, key=lambda x: x.index(":"))
759             maximum = 4 + maximum.index(":")
760             for l in lines:
761                 padding = " " * (maximum - l.index(":"))
762                 print(f"{padding}{l}")
763
764         indent_results(
765             [
766                 f"Scheduled tests: {self.all_testcases}",
767                 f"Executed tests: {self[TEST_RUN]}",
768                 f"Passed tests: {colorize(self[PASS], GREEN)}",
769                 f"Skipped tests: {colorize(self[SKIP], YELLOW)}"
770                 if self[SKIP]
771                 else None,
772                 f"Not Executed tests: {colorize(self.not_executed, RED)}"
773                 if self.not_executed
774                 else None,
775                 f"Failures: {colorize(self[FAIL], RED)}" if self[FAIL] else None,
776                 f"Errors: {colorize(self[ERROR], RED)}" if self[ERROR] else None,
777                 "Tests skipped due to lack of CPUS: "
778                 f"{colorize(self[SKIP_CPU_SHORTAGE], YELLOW)}"
779                 if self[SKIP_CPU_SHORTAGE]
780                 else None,
781             ]
782         )
783
784         if self.all_failed > 0:
785             print("FAILURES AND ERRORS IN TESTS:")
786             for result in self.results_per_suite:
787                 failed_testcase_ids = result[FAIL]
788                 errored_testcase_ids = result[ERROR]
789                 old_testcase_name = None
790                 if failed_testcase_ids:
791                     for failed_test_id in failed_testcase_ids:
792                         new_testcase_name, test_name = result.get_testcase_names(
793                             failed_test_id
794                         )
795                         if new_testcase_name != old_testcase_name:
796                             print(
797                                 "  Testcase name: {}".format(
798                                     colorize(new_testcase_name, RED)
799                                 )
800                             )
801                             old_testcase_name = new_testcase_name
802                         print(
803                             "    FAILURE: {} [{}]".format(
804                                 colorize(test_name, RED), failed_test_id
805                             )
806                         )
807                 if errored_testcase_ids:
808                     for errored_test_id in errored_testcase_ids:
809                         new_testcase_name, test_name = result.get_testcase_names(
810                             errored_test_id
811                         )
812                         if new_testcase_name != old_testcase_name:
813                             print(
814                                 "  Testcase name: {}".format(
815                                     colorize(new_testcase_name, RED)
816                                 )
817                             )
818                             old_testcase_name = new_testcase_name
819                         print(
820                             "      ERROR: {} [{}]".format(
821                                 colorize(test_name, RED), errored_test_id
822                             )
823                         )
824         if self.testsuites_no_tests_run:
825             print("TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:")
826             tc_classes = set()
827             for testsuite in self.testsuites_no_tests_run:
828                 for testcase in testsuite:
829                     tc_classes.add(get_testcase_doc_name(testcase))
830             for tc_class in tc_classes:
831                 print("  {}".format(colorize(tc_class, RED)))
832
833         if self[SKIP_CPU_SHORTAGE]:
834             print()
835             print(
836                 colorize(
837                     "     SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT"
838                     " ENOUGH CPUS AVAILABLE",
839                     YELLOW,
840                 )
841             )
842         print(double_line_delim)
843         print("")
844
845     @property
846     def not_executed(self):
847         return self.all_testcases - self[TEST_RUN]
848
849     @property
850     def all_failed(self):
851         return self[FAIL] + self[ERROR]
852
853
854 def parse_results(results):
855     """
856     Prints the number of scheduled, executed, not executed, passed, failed,
857     errored and skipped tests and details about failed and errored tests.
858
859     Also returns all suites where any test failed.
860
861     :param results:
862     :return:
863     """
864
865     results_per_suite = AllResults()
866     crashed = False
867     failed = False
868     for result in results:
869         result_code = results_per_suite.add_result(result)
870         if result_code == 1:
871             failed = True
872         elif result_code == -1:
873             crashed = True
874
875     results_per_suite.print_results()
876
877     if crashed:
878         return_code = -1
879     elif failed:
880         return_code = 1
881     else:
882         return_code = 0
883     return return_code, results_per_suite.rerun
884
885
886 if __name__ == "__main__":
887
888     print(f"Config is: {config}")
889
890     if config.sanity:
891         print("Running sanity test case.")
892         try:
893             rc = sanity_run_vpp.main()
894             if rc != 0:
895                 sys.exit(rc)
896         except Exception as e:
897             print(traceback.format_exc())
898             print("Couldn't run sanity test case.")
899             sys.exit(-1)
900
901     test_finished_join_timeout = 15
902
903     debug_gdb = config.debug in ["gdb", "gdbserver", "attach"]
904     debug_core = config.debug == "core"
905
906     run_interactive = debug_gdb or config.step or config.force_foreground
907
908     max_concurrent_tests = 0
909     print(f"OS reports {num_cpus} available cpu(s).")
910
911     test_jobs = config.jobs
912     if test_jobs == "auto":
913         if run_interactive:
914             max_concurrent_tests = 1
915             print("Interactive mode required, running tests consecutively.")
916         else:
917             max_concurrent_tests = num_cpus
918             print(
919                 f"Running at most {max_concurrent_tests} python test "
920                 "processes concurrently."
921             )
922     else:
923         max_concurrent_tests = test_jobs
924         print(
925             f"Running at most {max_concurrent_tests} python test processes "
926             "concurrently as set by 'TEST_JOBS'."
927         )
928
929     print(f"Using at most {max_vpp_cpus} cpus for VPP threads.")
930
931     if run_interactive and max_concurrent_tests > 1:
932         raise NotImplementedError(
933             "Running tests interactively (DEBUG is gdb[server] or ATTACH or "
934             "STEP is set) in parallel (TEST_JOBS is more than 1) is not "
935             "supported"
936         )
937
938     descriptions = True
939
940     print("Running tests using custom test runner.")
941     filter_file, filter_class, filter_func = parse_test_filter(config.filter)
942
943     print(
944         "Selected filters: file=%s, class=%s, function=%s"
945         % (filter_file, filter_class, filter_func)
946     )
947
948     filter_cb = FilterByTestOption(filter_file, filter_class, filter_func)
949
950     ignore_path = config.venv_dir
951     cb = SplitToSuitesCallback(filter_cb)
952     for d in config.test_src_dir:
953         print("Adding tests from directory tree %s" % d)
954         discover_tests(d, cb, ignore_path)
955
956     # suites are not hashable, need to use list
957     suites = []
958     tests_amount = 0
959     for testcase_suite in cb.suites.values():
960         tests_amount += testcase_suite.countTestCases()
961         if testcase_suite.cpus_used > max_vpp_cpus:
962             # here we replace test functions with lambdas to just skip them
963             # but we also replace setUp/tearDown functions to do nothing
964             # so that the test can be "started" and "stopped", so that we can
965             # still keep those prints (test description - SKIP), which are done
966             # in stopTest() (for that to trigger, test function must run)
967             for t in testcase_suite:
968                 for m in dir(t):
969                     if m.startswith("test_"):
970                         setattr(t, m, lambda: t.skipTest("not enough cpus"))
971                 setattr(t.__class__, "setUpClass", lambda: None)
972                 setattr(t.__class__, "tearDownClass", lambda: None)
973                 setattr(t, "setUp", lambda: None)
974                 setattr(t, "tearDown", lambda: None)
975                 t.__class__.skipped_due_to_cpu_lack = True
976         suites.append(testcase_suite)
977
978     print(
979         "%s out of %s tests match specified filters"
980         % (tests_amount, tests_amount + cb.filtered.countTestCases())
981     )
982
983     if not config.extended:
984         print("Not running extended tests (some tests will be skipped)")
985
986     attempts = config.retries + 1
987     if attempts > 1:
988         print("Perform %s attempts to pass the suite..." % attempts)
989
990     if run_interactive and suites:
991         # don't fork if requiring interactive terminal
992         print("Running tests in foreground in the current process")
993         full_suite = unittest.TestSuite()
994         free_cpus = list(available_cpus)
995         cpu_shortage = False
996         for suite in suites:
997             if suite.cpus_used <= max_vpp_cpus:
998                 suite.assign_cpus(free_cpus[: suite.cpus_used])
999             else:
1000                 suite.assign_cpus([])
1001                 cpu_shortage = True
1002         full_suite.addTests(suites)
1003         result = VppTestRunner(
1004             verbosity=config.verbose, failfast=config.failfast, print_summary=True
1005         ).run(full_suite)
1006         was_successful = result.wasSuccessful()
1007         if not was_successful:
1008             for test_case_info in result.failed_test_cases_info:
1009                 handle_failed_suite(
1010                     test_case_info.logger,
1011                     test_case_info.tempdir,
1012                     test_case_info.vpp_pid,
1013                     config.vpp,
1014                 )
1015                 if test_case_info in result.core_crash_test_cases_info:
1016                     check_and_handle_core(
1017                         test_case_info.vpp_bin_path,
1018                         test_case_info.tempdir,
1019                         test_case_info.core_crash_test,
1020                     )
1021
1022         if cpu_shortage:
1023             print()
1024             print(
1025                 colorize(
1026                     "SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT"
1027                     " ENOUGH CPUS AVAILABLE",
1028                     YELLOW,
1029                 )
1030             )
1031             print()
1032         sys.exit(not was_successful)
1033     else:
1034         print(
1035             "Running each VPPTestCase in a separate background process"
1036             f" with at most {max_concurrent_tests} parallel python test "
1037             "process(es)"
1038         )
1039         exit_code = 0
1040         while suites and attempts > 0:
1041             results = run_forked(suites)
1042             exit_code, suites = parse_results(results)
1043             attempts -= 1
1044             if exit_code == 0:
1045                 print("Test run was successful")
1046             else:
1047                 print("%s attempt(s) left." % attempts)
1048         sys.exit(exit_code)