tests: Use errno value rather than a specific int
[vpp.git] / test / run_tests.py
1 #!/usr/bin/env python3
2
3 import sys
4 import shutil
5 import os
6 import fnmatch
7 import unittest
8 import time
9 import threading
10 import traceback
11 import signal
12 import re
13 from multiprocessing import Process, Pipe, get_context
14 from multiprocessing.queues import Queue
15 from multiprocessing.managers import BaseManager
16 from config import config, num_cpus, available_cpus, max_vpp_cpus
17 from vpp_papi import VPPApiJSONFiles
18 from asfframework import (
19     VppTestRunner,
20     get_testcase_doc_name,
21     get_test_description,
22     get_failed_testcase_linkname,
23     get_testcase_dirname,
24 )
25 from framework import VppTestCase
26 from test_result_code import TestResultCode
27 from debug import spawn_gdb
28 from log import (
29     get_parallel_logger,
30     double_line_delim,
31     RED,
32     YELLOW,
33     GREEN,
34     colorize,
35     single_line_delim,
36 )
37 from discover_tests import discover_tests
38 import sanity_run_vpp
39 from subprocess import check_output, CalledProcessError
40 from util import check_core_path, get_core_path, is_core_present
41
42 # timeout which controls how long the child has to finish after seeing
43 # a core dump in test temporary directory. If this is exceeded, parent assumes
44 # that child process is stuck (e.g. waiting for event from vpp) and kill
45 # the child
46 core_timeout = 3
47
48
49 class StreamQueue(Queue):
50     def write(self, msg):
51         self.put(msg)
52
53     def flush(self):
54         sys.__stdout__.flush()
55         sys.__stderr__.flush()
56
57     def fileno(self):
58         return self._writer.fileno()
59
60
61 class StreamQueueManager(BaseManager):
62     pass
63
64
65 StreamQueueManager.register("StreamQueue", StreamQueue)
66
67
68 class TestResult(dict):
69     def __init__(self, testcase_suite, testcases_by_id=None):
70         super(TestResult, self).__init__()
71         for trc in list(TestResultCode):
72             self[trc] = []
73         self.crashed = False
74         self.testcase_suite = testcase_suite
75         self.testcases = [testcase for testcase in testcase_suite]
76         self.testcases_by_id = testcases_by_id
77
78     def was_successful(self):
79         return (
80             0
81             == len(self[TestResultCode.FAIL])
82             == len(self[TestResultCode.ERROR])
83             == len(self[TestResultCode.UNEXPECTED_PASS])
84             and len(self[TestResultCode.PASS])
85             + len(self[TestResultCode.SKIP])
86             + len(self[TestResultCode.SKIP_CPU_SHORTAGE])
87             + len(self[TestResultCode.EXPECTED_FAIL])
88             == self.testcase_suite.countTestCases()
89         )
90
91     def no_tests_run(self):
92         return 0 == len(self[TestResultCode.TEST_RUN])
93
94     def process_result(self, test_id, result):
95         self[result].append(test_id)
96
97     def suite_from_failed(self):
98         rerun_ids = set([])
99         for testcase in self.testcase_suite:
100             tc_id = testcase.id()
101             if (
102                 tc_id
103                 not in self[TestResultCode.PASS]
104                 + self[TestResultCode.SKIP]
105                 + self[TestResultCode.SKIP_CPU_SHORTAGE]
106                 + self[TestResultCode.EXPECTED_FAIL]
107             ):
108                 rerun_ids.add(tc_id)
109         if rerun_ids:
110             return suite_from_failed(self.testcase_suite, rerun_ids)
111
112     def get_testcase_names(self, test_id):
113         # could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
114         setup_teardown_match = re.match(
115             r"((tearDownClass)|(setUpClass)) \((.+\..+)\)", test_id
116         )
117         if setup_teardown_match:
118             test_name, _, _, testcase_name = setup_teardown_match.groups()
119             if len(testcase_name.split(".")) == 2:
120                 for key in self.testcases_by_id.keys():
121                     if key.startswith(testcase_name):
122                         testcase_name = key
123                         break
124             testcase_name = self._get_testcase_doc_name(testcase_name)
125         else:
126             test_name = self._get_test_description(test_id)
127             testcase_name = self._get_testcase_doc_name(test_id)
128
129         return testcase_name, test_name
130
131     def _get_test_description(self, test_id):
132         if test_id in self.testcases_by_id:
133             desc = get_test_description(descriptions, self.testcases_by_id[test_id])
134         else:
135             desc = test_id
136         return desc
137
138     def _get_testcase_doc_name(self, test_id):
139         if test_id in self.testcases_by_id:
140             doc_name = get_testcase_doc_name(self.testcases_by_id[test_id])
141         else:
142             doc_name = test_id
143         return doc_name
144
145
146 def test_runner_wrapper(
147     suite, keep_alive_pipe, stdouterr_queue, finished_pipe, result_pipe, logger
148 ):
149     sys.stdout = stdouterr_queue
150     sys.stderr = stdouterr_queue
151     VppTestCase.parallel_handler = logger.handlers[0]
152     result = VppTestRunner(
153         keep_alive_pipe=keep_alive_pipe,
154         descriptions=descriptions,
155         verbosity=config.verbose,
156         result_pipe=result_pipe,
157         failfast=config.failfast,
158         print_summary=False,
159     ).run(suite)
160     finished_pipe.send(result.wasSuccessful())
161     finished_pipe.close()
162     keep_alive_pipe.close()
163
164
165 class TestCaseWrapper(object):
166     def __init__(self, testcase_suite, manager):
167         self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(duplex=False)
168         self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
169         self.result_parent_end, self.result_child_end = Pipe(duplex=False)
170         self.testcase_suite = testcase_suite
171         self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
172         self.logger = get_parallel_logger(self.stdouterr_queue)
173         self.child = Process(
174             target=test_runner_wrapper,
175             args=(
176                 testcase_suite,
177                 self.keep_alive_child_end,
178                 self.stdouterr_queue,
179                 self.finished_child_end,
180                 self.result_child_end,
181                 self.logger,
182             ),
183         )
184         self.child.start()
185         self.last_test_temp_dir = None
186         self.last_test_vpp_binary = None
187         self._last_test = None
188         self.last_test_id = None
189         self.vpp_pid = None
190         self.last_heard = time.time()
191         self.core_detected_at = None
192         self.testcases_by_id = {}
193         self.testclasess_with_core = {}
194         for testcase in self.testcase_suite:
195             self.testcases_by_id[testcase.id()] = testcase
196         self.result = TestResult(testcase_suite, self.testcases_by_id)
197
198     @property
199     def last_test(self):
200         return self._last_test
201
202     @last_test.setter
203     def last_test(self, test_id):
204         self.last_test_id = test_id
205         if test_id in self.testcases_by_id:
206             testcase = self.testcases_by_id[test_id]
207             self._last_test = testcase.shortDescription()
208             if not self._last_test:
209                 self._last_test = str(testcase)
210         else:
211             self._last_test = test_id
212
213     def add_testclass_with_core(self):
214         if self.last_test_id in self.testcases_by_id:
215             test = self.testcases_by_id[self.last_test_id]
216             class_name = unittest.util.strclass(test.__class__)
217             test_name = "'{}' ({})".format(
218                 get_test_description(descriptions, test), self.last_test_id
219             )
220         else:
221             test_name = self.last_test_id
222             class_name = re.match(
223                 r"((tearDownClass)|(setUpClass)) " r"\((.+\..+)\)", test_name
224             ).groups()[3]
225         if class_name not in self.testclasess_with_core:
226             self.testclasess_with_core[class_name] = (
227                 test_name,
228                 self.last_test_vpp_binary,
229                 self.last_test_temp_dir,
230             )
231
232     def close_pipes(self):
233         self.keep_alive_child_end.close()
234         self.finished_child_end.close()
235         self.result_child_end.close()
236         self.keep_alive_parent_end.close()
237         self.finished_parent_end.close()
238         self.result_parent_end.close()
239
240     def was_successful(self):
241         return self.result.was_successful()
242
243     @property
244     def cpus_used(self):
245         return self.testcase_suite.cpus_used
246
247     def get_assigned_cpus(self):
248         return self.testcase_suite.get_assigned_cpus()
249
250
251 def stdouterr_reader_wrapper(
252     unread_testcases, finished_unread_testcases, read_testcases
253 ):
254     read_testcase = None
255     while read_testcases.is_set() or unread_testcases:
256         if finished_unread_testcases:
257             read_testcase = finished_unread_testcases.pop()
258             unread_testcases.remove(read_testcase)
259         elif unread_testcases:
260             read_testcase = unread_testcases.pop()
261         if read_testcase:
262             data = ""
263             while data is not None:
264                 sys.stdout.write(data)
265                 data = read_testcase.stdouterr_queue.get()
266
267             read_testcase.stdouterr_queue.close()
268             finished_unread_testcases.discard(read_testcase)
269             read_testcase = None
270
271
272 def handle_failed_suite(logger, last_test_temp_dir, vpp_pid, vpp_binary):
273     if last_test_temp_dir:
274         # Need to create link in case of a timeout or core dump without failure
275         lttd = os.path.basename(last_test_temp_dir)
276         link_path = os.path.join(config.failed_dir, f"{lttd}-FAILED")
277         if not os.path.exists(link_path):
278             os.symlink(last_test_temp_dir, link_path)
279         logger.error(
280             "Symlink to failed testcase directory: %s -> %s" % (link_path, lttd)
281         )
282
283         # Report core existence
284         core_path = get_core_path(last_test_temp_dir)
285         if os.path.exists(core_path):
286             logger.error(
287                 "Core-file exists in test temporary directory: %s!" % core_path
288             )
289             check_core_path(logger, core_path)
290             logger.debug("Running 'file %s':" % core_path)
291             try:
292                 info = check_output(["file", core_path])
293                 logger.debug(info)
294             except CalledProcessError as e:
295                 logger.error(
296                     "Subprocess returned with return code "
297                     "while running `file' utility on core-file "
298                     "returned: "
299                     "rc=%s",
300                     e.returncode,
301                 )
302             except OSError as e:
303                 logger.error(
304                     "Subprocess returned with OS error while "
305                     "running 'file' utility "
306                     "on core-file: "
307                     "(%s) %s",
308                     e.errno,
309                     e.strerror,
310                 )
311             except Exception as e:
312                 logger.exception("Unexpected error running `file' utility on core-file")
313             logger.error(f"gdb {vpp_binary} {core_path}")
314
315     if vpp_pid:
316         # Copy api post mortem
317         api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
318         if os.path.isfile(api_post_mortem_path):
319             logger.error(
320                 "Copying api_post_mortem.%d to %s" % (vpp_pid, last_test_temp_dir)
321             )
322             shutil.copy2(api_post_mortem_path, last_test_temp_dir)
323
324
325 def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
326     if is_core_present(tempdir):
327         if debug_core:
328             print(
329                 "VPP core detected in %s. Last test running was %s"
330                 % (tempdir, core_crash_test)
331             )
332             print(single_line_delim)
333             spawn_gdb(vpp_binary, get_core_path(tempdir))
334             print(single_line_delim)
335         elif config.compress_core:
336             print("Compressing core-file in test directory `%s'" % tempdir)
337             os.system("gzip %s" % get_core_path(tempdir))
338
339
340 def handle_cores(failed_testcases):
341     for failed_testcase in failed_testcases:
342         tcs_with_core = failed_testcase.testclasess_with_core
343         if tcs_with_core:
344             for test, vpp_binary, tempdir in tcs_with_core.values():
345                 check_and_handle_core(vpp_binary, tempdir, test)
346
347
348 def process_finished_testsuite(
349     wrapped_testcase_suite, finished_testcase_suites, failed_wrapped_testcases, results
350 ):
351     results.append(wrapped_testcase_suite.result)
352     finished_testcase_suites.add(wrapped_testcase_suite)
353     stop_run = False
354     if config.failfast and not wrapped_testcase_suite.was_successful():
355         stop_run = True
356
357     if not wrapped_testcase_suite.was_successful():
358         failed_wrapped_testcases.add(wrapped_testcase_suite)
359         handle_failed_suite(
360             wrapped_testcase_suite.logger,
361             wrapped_testcase_suite.last_test_temp_dir,
362             wrapped_testcase_suite.vpp_pid,
363             wrapped_testcase_suite.last_test_vpp_binary,
364         )
365
366     return stop_run
367
368
369 def run_forked(testcase_suites):
370     wrapped_testcase_suites = set()
371     solo_testcase_suites = []
372
373     # suites are unhashable, need to use list
374     results = []
375     unread_testcases = set()
376     finished_unread_testcases = set()
377     manager = StreamQueueManager()
378     manager.start()
379     tests_running = 0
380     free_cpus = list(available_cpus)
381
382     def on_suite_start(tc):
383         nonlocal tests_running
384         nonlocal free_cpus
385         tests_running = tests_running + 1
386
387     def on_suite_finish(tc):
388         nonlocal tests_running
389         nonlocal free_cpus
390         tests_running = tests_running - 1
391         assert tests_running >= 0
392         free_cpus.extend(tc.get_assigned_cpus())
393
394     def run_suite(suite):
395         nonlocal manager
396         nonlocal wrapped_testcase_suites
397         nonlocal unread_testcases
398         nonlocal free_cpus
399         suite.assign_cpus(free_cpus[: suite.cpus_used])
400         free_cpus = free_cpus[suite.cpus_used :]
401         wrapper = TestCaseWrapper(suite, manager)
402         wrapped_testcase_suites.add(wrapper)
403         unread_testcases.add(wrapper)
404         on_suite_start(suite)
405
406     def can_run_suite(suite):
407         return tests_running < max_concurrent_tests and (
408             suite.cpus_used <= len(free_cpus) or suite.cpus_used > max_vpp_cpus
409         )
410
411     while free_cpus and testcase_suites:
412         a_suite = testcase_suites[0]
413         if a_suite.is_tagged_run_solo:
414             a_suite = testcase_suites.pop(0)
415             solo_testcase_suites.append(a_suite)
416             continue
417         if can_run_suite(a_suite):
418             a_suite = testcase_suites.pop(0)
419             run_suite(a_suite)
420         else:
421             break
422
423     if tests_running == 0 and solo_testcase_suites:
424         a_suite = solo_testcase_suites.pop(0)
425         run_suite(a_suite)
426
427     read_from_testcases = threading.Event()
428     read_from_testcases.set()
429     stdouterr_thread = threading.Thread(
430         target=stdouterr_reader_wrapper,
431         args=(unread_testcases, finished_unread_testcases, read_from_testcases),
432     )
433     stdouterr_thread.start()
434
435     failed_wrapped_testcases = set()
436     stop_run = False
437
438     try:
439         while wrapped_testcase_suites or testcase_suites:
440             finished_testcase_suites = set()
441             for wrapped_testcase_suite in wrapped_testcase_suites:
442                 while wrapped_testcase_suite.result_parent_end.poll():
443                     wrapped_testcase_suite.result.process_result(
444                         *wrapped_testcase_suite.result_parent_end.recv()
445                     )
446                     wrapped_testcase_suite.last_heard = time.time()
447
448                 while wrapped_testcase_suite.keep_alive_parent_end.poll():
449                     (
450                         wrapped_testcase_suite.last_test,
451                         wrapped_testcase_suite.last_test_vpp_binary,
452                         wrapped_testcase_suite.last_test_temp_dir,
453                         wrapped_testcase_suite.vpp_pid,
454                     ) = wrapped_testcase_suite.keep_alive_parent_end.recv()
455                     wrapped_testcase_suite.last_heard = time.time()
456
457                 if wrapped_testcase_suite.finished_parent_end.poll():
458                     wrapped_testcase_suite.finished_parent_end.recv()
459                     wrapped_testcase_suite.last_heard = time.time()
460                     stop_run = (
461                         process_finished_testsuite(
462                             wrapped_testcase_suite,
463                             finished_testcase_suites,
464                             failed_wrapped_testcases,
465                             results,
466                         )
467                         or stop_run
468                     )
469                     continue
470
471                 fail = False
472                 if wrapped_testcase_suite.last_heard + config.timeout < time.time():
473                     fail = True
474                     wrapped_testcase_suite.logger.critical(
475                         "Child test runner process timed out "
476                         "(last test running was `%s' in `%s')!"
477                         % (
478                             wrapped_testcase_suite.last_test,
479                             wrapped_testcase_suite.last_test_temp_dir,
480                         )
481                     )
482                 elif not wrapped_testcase_suite.child.is_alive():
483                     fail = True
484                     wrapped_testcase_suite.logger.critical(
485                         "Child test runner process unexpectedly died "
486                         "(last test running was `%s' in `%s')!"
487                         % (
488                             wrapped_testcase_suite.last_test,
489                             wrapped_testcase_suite.last_test_temp_dir,
490                         )
491                     )
492                 elif (
493                     wrapped_testcase_suite.last_test_temp_dir
494                     and wrapped_testcase_suite.last_test_vpp_binary
495                 ):
496                     if is_core_present(wrapped_testcase_suite.last_test_temp_dir):
497                         wrapped_testcase_suite.add_testclass_with_core()
498                         if wrapped_testcase_suite.core_detected_at is None:
499                             wrapped_testcase_suite.core_detected_at = time.time()
500                         elif (
501                             wrapped_testcase_suite.core_detected_at + core_timeout
502                             < time.time()
503                         ):
504                             wrapped_testcase_suite.logger.critical(
505                                 "Child test runner process unresponsive and "
506                                 "core-file exists in test temporary directory "
507                                 "(last test running was `%s' in `%s')!"
508                                 % (
509                                     wrapped_testcase_suite.last_test,
510                                     wrapped_testcase_suite.last_test_temp_dir,
511                                 )
512                             )
513                             fail = True
514
515                 if fail:
516                     wrapped_testcase_suite.child.terminate()
517                     try:
518                         # terminating the child process tends to leave orphan
519                         # VPP process around
520                         if wrapped_testcase_suite.vpp_pid:
521                             os.kill(wrapped_testcase_suite.vpp_pid, signal.SIGTERM)
522                     except OSError:
523                         # already dead
524                         pass
525                     wrapped_testcase_suite.result.crashed = True
526                     wrapped_testcase_suite.result.process_result(
527                         wrapped_testcase_suite.last_test_id, TestResultCode.ERROR
528                     )
529                     stop_run = (
530                         process_finished_testsuite(
531                             wrapped_testcase_suite,
532                             finished_testcase_suites,
533                             failed_wrapped_testcases,
534                             results,
535                         )
536                         or stop_run
537                     )
538
539             for finished_testcase in finished_testcase_suites:
540                 # Somewhat surprisingly, the join below may
541                 # timeout, even if client signaled that
542                 # it finished - so we note it just in case.
543                 join_start = time.time()
544                 finished_testcase.child.join(test_finished_join_timeout)
545                 join_end = time.time()
546                 if join_end - join_start >= test_finished_join_timeout:
547                     finished_testcase.logger.error(
548                         "Timeout joining finished test: %s (pid %d)"
549                         % (finished_testcase.last_test, finished_testcase.child.pid)
550                     )
551                 finished_testcase.close_pipes()
552                 wrapped_testcase_suites.remove(finished_testcase)
553                 finished_unread_testcases.add(finished_testcase)
554                 finished_testcase.stdouterr_queue.put(None)
555                 on_suite_finish(finished_testcase)
556                 if stop_run:
557                     while testcase_suites:
558                         results.append(TestResult(testcase_suites.pop(0)))
559                 elif testcase_suites:
560                     a_suite = testcase_suites[0]
561                     while a_suite and a_suite.is_tagged_run_solo:
562                         testcase_suites.pop(0)
563                         solo_testcase_suites.append(a_suite)
564                         if testcase_suites:
565                             a_suite = testcase_suites[0]
566                         else:
567                             a_suite = None
568                     if a_suite and can_run_suite(a_suite):
569                         testcase_suites.pop(0)
570                         run_suite(a_suite)
571                 if solo_testcase_suites and tests_running == 0:
572                     a_suite = solo_testcase_suites.pop(0)
573                     run_suite(a_suite)
574             time.sleep(0.1)
575     except Exception:
576         for wrapped_testcase_suite in wrapped_testcase_suites:
577             wrapped_testcase_suite.child.terminate()
578             wrapped_testcase_suite.stdouterr_queue.put(None)
579         raise
580     finally:
581         read_from_testcases.clear()
582         stdouterr_thread.join(config.timeout)
583         manager.shutdown()
584
585     handle_cores(failed_wrapped_testcases)
586     return results
587
588
589 class TestSuiteWrapper(unittest.TestSuite):
590     cpus_used = 0
591
592     def __init__(self):
593         return super().__init__()
594
595     def addTest(self, test):
596         self.cpus_used = max(self.cpus_used, test.get_cpus_required())
597         super().addTest(test)
598
599     def assign_cpus(self, cpus):
600         self.cpus = cpus
601
602     def _handleClassSetUp(self, test, result):
603         if not test.__class__.skipped_due_to_cpu_lack:
604             test.assign_cpus(self.cpus)
605         super()._handleClassSetUp(test, result)
606
607     def get_assigned_cpus(self):
608         return self.cpus
609
610
611 class SplitToSuitesCallback:
612     def __init__(self, filter_callback):
613         self.suites = {}
614         self.suite_name = "default"
615         self.filter_callback = filter_callback
616         self.filtered = TestSuiteWrapper()
617
618     def __call__(self, file_name, cls, method):
619         test_method = cls(method)
620         if self.filter_callback(file_name, cls.__name__, method):
621             self.suite_name = file_name + cls.__name__
622             if self.suite_name not in self.suites:
623                 self.suites[self.suite_name] = TestSuiteWrapper()
624                 self.suites[self.suite_name].is_tagged_run_solo = False
625             self.suites[self.suite_name].addTest(test_method)
626             if test_method.is_tagged_run_solo():
627                 self.suites[self.suite_name].is_tagged_run_solo = True
628
629         else:
630             self.filtered.addTest(test_method)
631
632
633 def parse_test_filter(test_filter):
634     f = test_filter
635     filter_file_name = None
636     filter_class_name = None
637     filter_func_name = None
638     if f:
639         if "." in f:
640             parts = f.split(".")
641             if len(parts) > 3:
642                 raise Exception(f"Invalid test filter: {test_filter}")
643             if len(parts) > 2:
644                 if parts[2] not in ("*", ""):
645                     filter_func_name = parts[2]
646             if parts[1] not in ("*", ""):
647                 filter_class_name = parts[1]
648             if parts[0] not in ("*", ""):
649                 if parts[0].startswith("test_"):
650                     filter_file_name = parts[0]
651                 else:
652                     filter_file_name = "test_%s" % parts[0]
653         else:
654             if f.startswith("test_"):
655                 filter_file_name = f
656             else:
657                 filter_file_name = "test_%s" % f
658     if filter_file_name:
659         filter_file_name = "%s.py" % filter_file_name
660     return filter_file_name, filter_class_name, filter_func_name
661
662
663 def filter_tests(tests, filter_cb):
664     result = TestSuiteWrapper()
665     for t in tests:
666         if isinstance(t, unittest.suite.TestSuite):
667             # this is a bunch of tests, recursively filter...
668             x = filter_tests(t, filter_cb)
669             if x.countTestCases() > 0:
670                 result.addTest(x)
671         elif isinstance(t, unittest.TestCase):
672             # this is a single test
673             parts = t.id().split(".")
674             # t.id() for common cases like this:
675             # test_classifier.TestClassifier.test_acl_ip
676             # apply filtering only if it is so
677             if len(parts) == 3:
678                 if not filter_cb(parts[0], parts[1], parts[2]):
679                     continue
680             result.addTest(t)
681         else:
682             # unexpected object, don't touch it
683             result.addTest(t)
684     return result
685
686
687 class FilterByTestOption:
688     def __init__(self, filters):
689         self.filters = filters
690
691     def __call__(self, file_name, class_name, func_name):
692         def test_one(
693             filter_file_name,
694             filter_class_name,
695             filter_func_name,
696             file_name,
697             class_name,
698             func_name,
699         ):
700             if filter_file_name:
701                 fn_match = fnmatch.fnmatch(file_name, filter_file_name)
702                 if not fn_match:
703                     return False
704             if filter_class_name and class_name != filter_class_name:
705                 return False
706             if filter_func_name and func_name != filter_func_name:
707                 return False
708             return True
709
710         for filter_file_name, filter_class_name, filter_func_name in self.filters:
711             if test_one(
712                 filter_file_name,
713                 filter_class_name,
714                 filter_func_name,
715                 file_name,
716                 class_name,
717                 func_name,
718             ):
719                 return True
720
721         return False
722
723
724 class FilterByClassList:
725     def __init__(self, classes_with_filenames):
726         self.classes_with_filenames = classes_with_filenames
727
728     def __call__(self, file_name, class_name, func_name):
729         return ".".join([file_name, class_name]) in self.classes_with_filenames
730
731
732 def suite_from_failed(suite, failed):
733     failed = {x.rsplit(".", 1)[0] for x in failed}
734     filter_cb = FilterByClassList(failed)
735     suite = filter_tests(suite, filter_cb)
736     return suite
737
738
739 class AllResults(dict):
740     def __init__(self):
741         super(AllResults, self).__init__()
742         self.all_testcases = 0
743         self.results_per_suite = []
744         for trc in list(TestResultCode):
745             self[trc] = 0
746         self.rerun = []
747         self.testsuites_no_tests_run = []
748
749     def add_results(self, result):
750         self.results_per_suite.append(result)
751         for trc in list(TestResultCode):
752             self[trc] += len(result[trc])
753
754     def add_result(self, result):
755         retval = 0
756         self.all_testcases += result.testcase_suite.countTestCases()
757         self.add_results(result)
758
759         if result.no_tests_run():
760             self.testsuites_no_tests_run.append(result.testcase_suite)
761             if result.crashed:
762                 retval = -1
763             else:
764                 retval = 1
765         elif not result.was_successful():
766             retval = 1
767
768         if retval != 0:
769             self.rerun.append(result.testcase_suite)
770
771         return retval
772
773     def print_results(self):
774         print("")
775         print(double_line_delim)
776         print("TEST RESULTS:")
777
778         def indent_results(lines):
779             lines = list(filter(None, lines))
780             maximum = max(lines, key=lambda x: x.index(":"))
781             maximum = 4 + maximum.index(":")
782             for l in lines:
783                 padding = " " * (maximum - l.index(":"))
784                 print(f"{padding}{l}")
785
786         indent_results(
787             [
788                 f"Scheduled tests: {self.all_testcases}",
789                 f"Executed tests: {self[TestResultCode.TEST_RUN]}",
790                 f"Passed tests: {colorize(self[TestResultCode.PASS], GREEN)}",
791                 f"Expected failures: {colorize(self[TestResultCode.EXPECTED_FAIL], GREEN)}"
792                 if self[TestResultCode.EXPECTED_FAIL]
793                 else None,
794                 f"Skipped tests: {colorize(self[TestResultCode.SKIP], YELLOW)}"
795                 if self[TestResultCode.SKIP]
796                 else None,
797                 f"Not Executed tests: {colorize(self.not_executed, RED)}"
798                 if self.not_executed
799                 else None,
800                 f"Failures: {colorize(self[TestResultCode.FAIL], RED)}"
801                 if self[TestResultCode.FAIL]
802                 else None,
803                 f"Unexpected passes: {colorize(self[TestResultCode.UNEXPECTED_PASS], RED)}"
804                 if self[TestResultCode.UNEXPECTED_PASS]
805                 else None,
806                 f"Errors: {colorize(self[TestResultCode.ERROR], RED)}"
807                 if self[TestResultCode.ERROR]
808                 else None,
809                 "Tests skipped due to lack of CPUS: "
810                 f"{colorize(self[TestResultCode.SKIP_CPU_SHORTAGE], YELLOW)}"
811                 if self[TestResultCode.SKIP_CPU_SHORTAGE]
812                 else None,
813             ]
814         )
815
816         if self.all_failed > 0:
817             print("FAILURES AND ERRORS IN TESTS:")
818             for result in self.results_per_suite:
819                 old_testcase_name = None
820                 for tr_code, headline in (
821                     (TestResultCode.FAIL, "FAILURE"),
822                     (TestResultCode.ERROR, "ERROR"),
823                     (TestResultCode.UNEXPECTED_PASS, "UNEXPECTED PASS"),
824                 ):
825                     if not result[tr_code]:
826                         continue
827
828                     for failed_test_id in result[tr_code]:
829                         new_testcase_name, test_name = result.get_testcase_names(
830                             failed_test_id
831                         )
832                         if new_testcase_name != old_testcase_name:
833                             print(
834                                 f"  Testcase name: {colorize(new_testcase_name, RED)}"
835                             )
836                             old_testcase_name = new_testcase_name
837                         print(
838                             f"    {headline}: {colorize(test_name, RED)} [{failed_test_id}]"
839                         )
840
841         if self.testsuites_no_tests_run:
842             print("TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:")
843             tc_classes = set()
844             for testsuite in self.testsuites_no_tests_run:
845                 for testcase in testsuite:
846                     tc_classes.add(get_testcase_doc_name(testcase))
847             for tc_class in tc_classes:
848                 print("  {}".format(colorize(tc_class, RED)))
849
850         if self[TestResultCode.SKIP_CPU_SHORTAGE]:
851             print()
852             print(
853                 colorize(
854                     "     SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT"
855                     " ENOUGH CPUS AVAILABLE",
856                     YELLOW,
857                 )
858             )
859         print(double_line_delim)
860         print("")
861
862     @property
863     def not_executed(self):
864         return self.all_testcases - self[TestResultCode.TEST_RUN]
865
866     @property
867     def all_failed(self):
868         return (
869             self[TestResultCode.FAIL]
870             + self[TestResultCode.ERROR]
871             + self[TestResultCode.UNEXPECTED_PASS]
872         )
873
874
875 def parse_results(results):
876     """
877     Prints the number of scheduled, executed, not executed, passed, failed,
878     errored and skipped tests and details about failed and errored tests.
879
880     Also returns all suites where any test failed.
881
882     :param results:
883     :return:
884     """
885
886     results_per_suite = AllResults()
887     crashed = False
888     failed = False
889     for result in results:
890         result_code = results_per_suite.add_result(result)
891         if result_code == 1:
892             failed = True
893         elif result_code == -1:
894             crashed = True
895
896     results_per_suite.print_results()
897
898     if crashed:
899         return_code = -1
900     elif failed:
901         return_code = 1
902     else:
903         return_code = 0
904     return return_code, results_per_suite.rerun
905
906
907 if __name__ == "__main__":
908     print(f"Config is: {config}")
909
910     if config.api_preload:
911         VPPApiJSONFiles.load_api(apidir=config.extern_apidir + [config.vpp_install_dir])
912
913     if config.sanity:
914         print("Running sanity test case.")
915         try:
916             rc = sanity_run_vpp.main()
917             if rc != 0:
918                 sys.exit(rc)
919         except Exception as e:
920             print(traceback.format_exc())
921             print("Couldn't run sanity test case.")
922             sys.exit(-1)
923
924     test_finished_join_timeout = 15
925
926     debug_gdb = config.debug in ["gdb", "gdbserver", "attach"]
927     debug_core = config.debug == "core"
928
929     run_interactive = debug_gdb or config.step or config.force_foreground
930
931     max_concurrent_tests = 0
932     print(f"OS reports {num_cpus} available cpu(s).")
933
934     test_jobs = config.jobs
935     if test_jobs == "auto":
936         if run_interactive:
937             max_concurrent_tests = 1
938             print("Interactive mode required, running tests consecutively.")
939         else:
940             max_concurrent_tests = num_cpus
941             print(
942                 f"Running at most {max_concurrent_tests} python test "
943                 "processes concurrently."
944             )
945     else:
946         max_concurrent_tests = test_jobs
947         print(
948             f"Running at most {max_concurrent_tests} python test processes "
949             "concurrently as set by 'TEST_JOBS'."
950         )
951
952     print(f"Using at most {max_vpp_cpus} cpus for VPP threads.")
953
954     if run_interactive and max_concurrent_tests > 1:
955         raise NotImplementedError(
956             "Running tests interactively (DEBUG is gdb[server] or ATTACH or "
957             "STEP is set) in parallel (TEST_JOBS is more than 1) is not "
958             "supported"
959         )
960
961     descriptions = True
962
963     print("Running tests using custom test runner.")
964     filters = [(parse_test_filter(f)) for f in config.filter.split(",")]
965
966     print(
967         "Selected filters: ",
968         "|".join(
969             f"file={filter_file}, class={filter_class}, function={filter_func}"
970             for filter_file, filter_class, filter_func in filters
971         ),
972     )
973
974     filter_cb = FilterByTestOption(filters)
975
976     cb = SplitToSuitesCallback(filter_cb)
977     for d in config.test_src_dir:
978         print("Adding tests from directory tree %s" % d)
979         discover_tests(d, cb)
980
981     # suites are not hashable, need to use list
982     suites = []
983     tests_amount = 0
984     for testcase_suite in cb.suites.values():
985         tests_amount += testcase_suite.countTestCases()
986         if testcase_suite.cpus_used > max_vpp_cpus:
987             # here we replace test functions with lambdas to just skip them
988             # but we also replace setUp/tearDown functions to do nothing
989             # so that the test can be "started" and "stopped", so that we can
990             # still keep those prints (test description - SKIP), which are done
991             # in stopTest() (for that to trigger, test function must run)
992             for t in testcase_suite:
993                 for m in dir(t):
994                     if m.startswith("test_"):
995                         setattr(t, m, lambda: t.skipTest("not enough cpus"))
996                 setattr(t.__class__, "setUpClass", lambda: None)
997                 setattr(t.__class__, "tearDownClass", lambda: None)
998                 setattr(t, "setUp", lambda: None)
999                 setattr(t, "tearDown", lambda: None)
1000                 t.__class__.skipped_due_to_cpu_lack = True
1001         suites.append(testcase_suite)
1002
1003     print(
1004         "%s out of %s tests match specified filters"
1005         % (tests_amount, tests_amount + cb.filtered.countTestCases())
1006     )
1007
1008     if not config.extended:
1009         print("Not running extended tests (some tests will be skipped)")
1010
1011     attempts = config.retries + 1
1012     if attempts > 1:
1013         print("Perform %s attempts to pass the suite..." % attempts)
1014
1015     if run_interactive and suites:
1016         # don't fork if requiring interactive terminal
1017         print("Running tests in foreground in the current process")
1018         full_suite = unittest.TestSuite()
1019         free_cpus = list(available_cpus)
1020         cpu_shortage = False
1021         for suite in suites:
1022             if suite.cpus_used <= max_vpp_cpus:
1023                 suite.assign_cpus(free_cpus[: suite.cpus_used])
1024             else:
1025                 suite.assign_cpus([])
1026                 cpu_shortage = True
1027         full_suite.addTests(suites)
1028         result = VppTestRunner(
1029             verbosity=config.verbose, failfast=config.failfast, print_summary=True
1030         ).run(full_suite)
1031         was_successful = result.wasSuccessful()
1032         if not was_successful:
1033             for test_case_info in result.failed_test_cases_info:
1034                 handle_failed_suite(
1035                     test_case_info.logger,
1036                     test_case_info.tempdir,
1037                     test_case_info.vpp_pid,
1038                     config.vpp,
1039                 )
1040                 if test_case_info in result.core_crash_test_cases_info:
1041                     check_and_handle_core(
1042                         test_case_info.vpp_bin_path,
1043                         test_case_info.tempdir,
1044                         test_case_info.core_crash_test,
1045                     )
1046
1047         if cpu_shortage:
1048             print()
1049             print(
1050                 colorize(
1051                     "SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT"
1052                     " ENOUGH CPUS AVAILABLE",
1053                     YELLOW,
1054                 )
1055             )
1056             print()
1057         sys.exit(not was_successful)
1058     else:
1059         print(
1060             "Running each VPPTestCase in a separate background process"
1061             f" with at most {max_concurrent_tests} parallel python test "
1062             "process(es)"
1063         )
1064         exit_code = 0
1065         while suites and attempts > 0:
1066             for suite in suites:
1067                 failed_link = get_failed_testcase_linkname(
1068                     config.failed_dir,
1069                     f"{get_testcase_dirname(suite._tests[0].__class__.__name__)}",
1070                 )
1071                 if os.path.islink(failed_link):
1072                     os.unlink(failed_link)
1073             results = run_forked(suites)
1074             exit_code, suites = parse_results(results)
1075             attempts -= 1
1076             if exit_code == 0:
1077                 print("Test run was successful")
1078             else:
1079                 print("%s attempt(s) left." % attempts)
1080         sys.exit(exit_code)