tests: support multiple filter expressions
[vpp.git] / test / run_tests.py
1 #!/usr/bin/env python3
2
3 import sys
4 import shutil
5 import os
6 import fnmatch
7 import unittest
8 import time
9 import threading
10 import traceback
11 import signal
12 import re
13 from multiprocessing import Process, Pipe, get_context
14 from multiprocessing.queues import Queue
15 from multiprocessing.managers import BaseManager
16 from config import config, num_cpus, available_cpus, max_vpp_cpus
17 from framework import (
18     VppTestRunner,
19     VppTestCase,
20     get_testcase_doc_name,
21     get_test_description,
22     PASS,
23     FAIL,
24     ERROR,
25     SKIP,
26     TEST_RUN,
27     SKIP_CPU_SHORTAGE,
28 )
29 from debug import spawn_gdb
30 from log import (
31     get_parallel_logger,
32     double_line_delim,
33     RED,
34     YELLOW,
35     GREEN,
36     colorize,
37     single_line_delim,
38 )
39 from discover_tests import discover_tests
40 import sanity_run_vpp
41 from subprocess import check_output, CalledProcessError
42 from util import check_core_path, get_core_path, is_core_present
43
44 # timeout which controls how long the child has to finish after seeing
45 # a core dump in test temporary directory. If this is exceeded, parent assumes
46 # that child process is stuck (e.g. waiting for event from vpp) and kill
47 # the child
48 core_timeout = 3
49
50
51 class StreamQueue(Queue):
52     def write(self, msg):
53         self.put(msg)
54
55     def flush(self):
56         sys.__stdout__.flush()
57         sys.__stderr__.flush()
58
59     def fileno(self):
60         return self._writer.fileno()
61
62
63 class StreamQueueManager(BaseManager):
64     pass
65
66
67 StreamQueueManager.register("StreamQueue", StreamQueue)
68
69
70 class TestResult(dict):
71     def __init__(self, testcase_suite, testcases_by_id=None):
72         super(TestResult, self).__init__()
73         self[PASS] = []
74         self[FAIL] = []
75         self[ERROR] = []
76         self[SKIP] = []
77         self[SKIP_CPU_SHORTAGE] = []
78         self[TEST_RUN] = []
79         self.crashed = False
80         self.testcase_suite = testcase_suite
81         self.testcases = [testcase for testcase in testcase_suite]
82         self.testcases_by_id = testcases_by_id
83
84     def was_successful(self):
85         return (
86             0 == len(self[FAIL]) == len(self[ERROR])
87             and len(self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE])
88             == self.testcase_suite.countTestCases()
89         )
90
91     def no_tests_run(self):
92         return 0 == len(self[TEST_RUN])
93
94     def process_result(self, test_id, result):
95         self[result].append(test_id)
96
97     def suite_from_failed(self):
98         rerun_ids = set([])
99         for testcase in self.testcase_suite:
100             tc_id = testcase.id()
101             if tc_id not in self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE]:
102                 rerun_ids.add(tc_id)
103         if rerun_ids:
104             return suite_from_failed(self.testcase_suite, rerun_ids)
105
106     def get_testcase_names(self, test_id):
107         # could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
108         setup_teardown_match = re.match(
109             r"((tearDownClass)|(setUpClass)) \((.+\..+)\)", test_id
110         )
111         if setup_teardown_match:
112             test_name, _, _, testcase_name = setup_teardown_match.groups()
113             if len(testcase_name.split(".")) == 2:
114                 for key in self.testcases_by_id.keys():
115                     if key.startswith(testcase_name):
116                         testcase_name = key
117                         break
118             testcase_name = self._get_testcase_doc_name(testcase_name)
119         else:
120             test_name = self._get_test_description(test_id)
121             testcase_name = self._get_testcase_doc_name(test_id)
122
123         return testcase_name, test_name
124
125     def _get_test_description(self, test_id):
126         if test_id in self.testcases_by_id:
127             desc = get_test_description(descriptions, self.testcases_by_id[test_id])
128         else:
129             desc = test_id
130         return desc
131
132     def _get_testcase_doc_name(self, test_id):
133         if test_id in self.testcases_by_id:
134             doc_name = get_testcase_doc_name(self.testcases_by_id[test_id])
135         else:
136             doc_name = test_id
137         return doc_name
138
139
140 def test_runner_wrapper(
141     suite, keep_alive_pipe, stdouterr_queue, finished_pipe, result_pipe, logger
142 ):
143     sys.stdout = stdouterr_queue
144     sys.stderr = stdouterr_queue
145     VppTestCase.parallel_handler = logger.handlers[0]
146     result = VppTestRunner(
147         keep_alive_pipe=keep_alive_pipe,
148         descriptions=descriptions,
149         verbosity=config.verbose,
150         result_pipe=result_pipe,
151         failfast=config.failfast,
152         print_summary=False,
153     ).run(suite)
154     finished_pipe.send(result.wasSuccessful())
155     finished_pipe.close()
156     keep_alive_pipe.close()
157
158
159 class TestCaseWrapper(object):
160     def __init__(self, testcase_suite, manager):
161         self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(duplex=False)
162         self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
163         self.result_parent_end, self.result_child_end = Pipe(duplex=False)
164         self.testcase_suite = testcase_suite
165         self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
166         self.logger = get_parallel_logger(self.stdouterr_queue)
167         self.child = Process(
168             target=test_runner_wrapper,
169             args=(
170                 testcase_suite,
171                 self.keep_alive_child_end,
172                 self.stdouterr_queue,
173                 self.finished_child_end,
174                 self.result_child_end,
175                 self.logger,
176             ),
177         )
178         self.child.start()
179         self.last_test_temp_dir = None
180         self.last_test_vpp_binary = None
181         self._last_test = None
182         self.last_test_id = None
183         self.vpp_pid = None
184         self.last_heard = time.time()
185         self.core_detected_at = None
186         self.testcases_by_id = {}
187         self.testclasess_with_core = {}
188         for testcase in self.testcase_suite:
189             self.testcases_by_id[testcase.id()] = testcase
190         self.result = TestResult(testcase_suite, self.testcases_by_id)
191
192     @property
193     def last_test(self):
194         return self._last_test
195
196     @last_test.setter
197     def last_test(self, test_id):
198         self.last_test_id = test_id
199         if test_id in self.testcases_by_id:
200             testcase = self.testcases_by_id[test_id]
201             self._last_test = testcase.shortDescription()
202             if not self._last_test:
203                 self._last_test = str(testcase)
204         else:
205             self._last_test = test_id
206
207     def add_testclass_with_core(self):
208         if self.last_test_id in self.testcases_by_id:
209             test = self.testcases_by_id[self.last_test_id]
210             class_name = unittest.util.strclass(test.__class__)
211             test_name = "'{}' ({})".format(
212                 get_test_description(descriptions, test), self.last_test_id
213             )
214         else:
215             test_name = self.last_test_id
216             class_name = re.match(
217                 r"((tearDownClass)|(setUpClass)) " r"\((.+\..+)\)", test_name
218             ).groups()[3]
219         if class_name not in self.testclasess_with_core:
220             self.testclasess_with_core[class_name] = (
221                 test_name,
222                 self.last_test_vpp_binary,
223                 self.last_test_temp_dir,
224             )
225
226     def close_pipes(self):
227         self.keep_alive_child_end.close()
228         self.finished_child_end.close()
229         self.result_child_end.close()
230         self.keep_alive_parent_end.close()
231         self.finished_parent_end.close()
232         self.result_parent_end.close()
233
234     def was_successful(self):
235         return self.result.was_successful()
236
237     @property
238     def cpus_used(self):
239         return self.testcase_suite.cpus_used
240
241     def get_assigned_cpus(self):
242         return self.testcase_suite.get_assigned_cpus()
243
244
245 def stdouterr_reader_wrapper(
246     unread_testcases, finished_unread_testcases, read_testcases
247 ):
248     read_testcase = None
249     while read_testcases.is_set() or unread_testcases:
250         if finished_unread_testcases:
251             read_testcase = finished_unread_testcases.pop()
252             unread_testcases.remove(read_testcase)
253         elif unread_testcases:
254             read_testcase = unread_testcases.pop()
255         if read_testcase:
256             data = ""
257             while data is not None:
258                 sys.stdout.write(data)
259                 data = read_testcase.stdouterr_queue.get()
260
261             read_testcase.stdouterr_queue.close()
262             finished_unread_testcases.discard(read_testcase)
263             read_testcase = None
264
265
266 def handle_failed_suite(logger, last_test_temp_dir, vpp_pid, vpp_binary):
267     if last_test_temp_dir:
268         # Need to create link in case of a timeout or core dump without failure
269         lttd = os.path.basename(last_test_temp_dir)
270         link_path = os.path.join(config.failed_dir, f"{lttd}-FAILED")
271         if not os.path.exists(link_path):
272             os.symlink(last_test_temp_dir, link_path)
273         logger.error(
274             "Symlink to failed testcase directory: %s -> %s" % (link_path, lttd)
275         )
276
277         # Report core existence
278         core_path = get_core_path(last_test_temp_dir)
279         if os.path.exists(core_path):
280             logger.error(
281                 "Core-file exists in test temporary directory: %s!" % core_path
282             )
283             check_core_path(logger, core_path)
284             logger.debug("Running 'file %s':" % core_path)
285             try:
286                 info = check_output(["file", core_path])
287                 logger.debug(info)
288             except CalledProcessError as e:
289                 logger.error(
290                     "Subprocess returned with return code "
291                     "while running `file' utility on core-file "
292                     "returned: "
293                     "rc=%s",
294                     e.returncode,
295                 )
296             except OSError as e:
297                 logger.error(
298                     "Subprocess returned with OS error while "
299                     "running 'file' utility "
300                     "on core-file: "
301                     "(%s) %s",
302                     e.errno,
303                     e.strerror,
304                 )
305             except Exception as e:
306                 logger.exception("Unexpected error running `file' utility on core-file")
307             logger.error(f"gdb {vpp_binary} {core_path}")
308
309     if vpp_pid:
310         # Copy api post mortem
311         api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
312         if os.path.isfile(api_post_mortem_path):
313             logger.error(
314                 "Copying api_post_mortem.%d to %s" % (vpp_pid, last_test_temp_dir)
315             )
316             shutil.copy2(api_post_mortem_path, last_test_temp_dir)
317
318
319 def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
320     if is_core_present(tempdir):
321         if debug_core:
322             print(
323                 "VPP core detected in %s. Last test running was %s"
324                 % (tempdir, core_crash_test)
325             )
326             print(single_line_delim)
327             spawn_gdb(vpp_binary, get_core_path(tempdir))
328             print(single_line_delim)
329         elif config.compress_core:
330             print("Compressing core-file in test directory `%s'" % tempdir)
331             os.system("gzip %s" % get_core_path(tempdir))
332
333
334 def handle_cores(failed_testcases):
335     for failed_testcase in failed_testcases:
336         tcs_with_core = failed_testcase.testclasess_with_core
337         if tcs_with_core:
338             for test, vpp_binary, tempdir in tcs_with_core.values():
339                 check_and_handle_core(vpp_binary, tempdir, test)
340
341
342 def process_finished_testsuite(
343     wrapped_testcase_suite, finished_testcase_suites, failed_wrapped_testcases, results
344 ):
345     results.append(wrapped_testcase_suite.result)
346     finished_testcase_suites.add(wrapped_testcase_suite)
347     stop_run = False
348     if config.failfast and not wrapped_testcase_suite.was_successful():
349         stop_run = True
350
351     if not wrapped_testcase_suite.was_successful():
352         failed_wrapped_testcases.add(wrapped_testcase_suite)
353         handle_failed_suite(
354             wrapped_testcase_suite.logger,
355             wrapped_testcase_suite.last_test_temp_dir,
356             wrapped_testcase_suite.vpp_pid,
357             wrapped_testcase_suite.last_test_vpp_binary,
358         )
359
360     return stop_run
361
362
363 def run_forked(testcase_suites):
364     wrapped_testcase_suites = set()
365     solo_testcase_suites = []
366
367     # suites are unhashable, need to use list
368     results = []
369     unread_testcases = set()
370     finished_unread_testcases = set()
371     manager = StreamQueueManager()
372     manager.start()
373     tests_running = 0
374     free_cpus = list(available_cpus)
375
376     def on_suite_start(tc):
377         nonlocal tests_running
378         nonlocal free_cpus
379         tests_running = tests_running + 1
380
381     def on_suite_finish(tc):
382         nonlocal tests_running
383         nonlocal free_cpus
384         tests_running = tests_running - 1
385         assert tests_running >= 0
386         free_cpus.extend(tc.get_assigned_cpus())
387
388     def run_suite(suite):
389         nonlocal manager
390         nonlocal wrapped_testcase_suites
391         nonlocal unread_testcases
392         nonlocal free_cpus
393         suite.assign_cpus(free_cpus[: suite.cpus_used])
394         free_cpus = free_cpus[suite.cpus_used :]
395         wrapper = TestCaseWrapper(suite, manager)
396         wrapped_testcase_suites.add(wrapper)
397         unread_testcases.add(wrapper)
398         on_suite_start(suite)
399
400     def can_run_suite(suite):
401         return tests_running < max_concurrent_tests and (
402             suite.cpus_used <= len(free_cpus) or suite.cpus_used > max_vpp_cpus
403         )
404
405     while free_cpus and testcase_suites:
406         a_suite = testcase_suites[0]
407         if a_suite.is_tagged_run_solo:
408             a_suite = testcase_suites.pop(0)
409             solo_testcase_suites.append(a_suite)
410             continue
411         if can_run_suite(a_suite):
412             a_suite = testcase_suites.pop(0)
413             run_suite(a_suite)
414         else:
415             break
416
417     if tests_running == 0 and solo_testcase_suites:
418         a_suite = solo_testcase_suites.pop(0)
419         run_suite(a_suite)
420
421     read_from_testcases = threading.Event()
422     read_from_testcases.set()
423     stdouterr_thread = threading.Thread(
424         target=stdouterr_reader_wrapper,
425         args=(unread_testcases, finished_unread_testcases, read_from_testcases),
426     )
427     stdouterr_thread.start()
428
429     failed_wrapped_testcases = set()
430     stop_run = False
431
432     try:
433         while wrapped_testcase_suites:
434             finished_testcase_suites = set()
435             for wrapped_testcase_suite in wrapped_testcase_suites:
436                 while wrapped_testcase_suite.result_parent_end.poll():
437                     wrapped_testcase_suite.result.process_result(
438                         *wrapped_testcase_suite.result_parent_end.recv()
439                     )
440                     wrapped_testcase_suite.last_heard = time.time()
441
442                 while wrapped_testcase_suite.keep_alive_parent_end.poll():
443                     (
444                         wrapped_testcase_suite.last_test,
445                         wrapped_testcase_suite.last_test_vpp_binary,
446                         wrapped_testcase_suite.last_test_temp_dir,
447                         wrapped_testcase_suite.vpp_pid,
448                     ) = wrapped_testcase_suite.keep_alive_parent_end.recv()
449                     wrapped_testcase_suite.last_heard = time.time()
450
451                 if wrapped_testcase_suite.finished_parent_end.poll():
452                     wrapped_testcase_suite.finished_parent_end.recv()
453                     wrapped_testcase_suite.last_heard = time.time()
454                     stop_run = (
455                         process_finished_testsuite(
456                             wrapped_testcase_suite,
457                             finished_testcase_suites,
458                             failed_wrapped_testcases,
459                             results,
460                         )
461                         or stop_run
462                     )
463                     continue
464
465                 fail = False
466                 if wrapped_testcase_suite.last_heard + config.timeout < time.time():
467                     fail = True
468                     wrapped_testcase_suite.logger.critical(
469                         "Child test runner process timed out "
470                         "(last test running was `%s' in `%s')!"
471                         % (
472                             wrapped_testcase_suite.last_test,
473                             wrapped_testcase_suite.last_test_temp_dir,
474                         )
475                     )
476                 elif not wrapped_testcase_suite.child.is_alive():
477                     fail = True
478                     wrapped_testcase_suite.logger.critical(
479                         "Child test runner process unexpectedly died "
480                         "(last test running was `%s' in `%s')!"
481                         % (
482                             wrapped_testcase_suite.last_test,
483                             wrapped_testcase_suite.last_test_temp_dir,
484                         )
485                     )
486                 elif (
487                     wrapped_testcase_suite.last_test_temp_dir
488                     and wrapped_testcase_suite.last_test_vpp_binary
489                 ):
490                     if is_core_present(wrapped_testcase_suite.last_test_temp_dir):
491                         wrapped_testcase_suite.add_testclass_with_core()
492                         if wrapped_testcase_suite.core_detected_at is None:
493                             wrapped_testcase_suite.core_detected_at = time.time()
494                         elif (
495                             wrapped_testcase_suite.core_detected_at + core_timeout
496                             < time.time()
497                         ):
498                             wrapped_testcase_suite.logger.critical(
499                                 "Child test runner process unresponsive and "
500                                 "core-file exists in test temporary directory "
501                                 "(last test running was `%s' in `%s')!"
502                                 % (
503                                     wrapped_testcase_suite.last_test,
504                                     wrapped_testcase_suite.last_test_temp_dir,
505                                 )
506                             )
507                             fail = True
508
509                 if fail:
510                     wrapped_testcase_suite.child.terminate()
511                     try:
512                         # terminating the child process tends to leave orphan
513                         # VPP process around
514                         if wrapped_testcase_suite.vpp_pid:
515                             os.kill(wrapped_testcase_suite.vpp_pid, signal.SIGTERM)
516                     except OSError:
517                         # already dead
518                         pass
519                     wrapped_testcase_suite.result.crashed = True
520                     wrapped_testcase_suite.result.process_result(
521                         wrapped_testcase_suite.last_test_id, ERROR
522                     )
523                     stop_run = (
524                         process_finished_testsuite(
525                             wrapped_testcase_suite,
526                             finished_testcase_suites,
527                             failed_wrapped_testcases,
528                             results,
529                         )
530                         or stop_run
531                     )
532
533             for finished_testcase in finished_testcase_suites:
534                 # Somewhat surprisingly, the join below may
535                 # timeout, even if client signaled that
536                 # it finished - so we note it just in case.
537                 join_start = time.time()
538                 finished_testcase.child.join(test_finished_join_timeout)
539                 join_end = time.time()
540                 if join_end - join_start >= test_finished_join_timeout:
541                     finished_testcase.logger.error(
542                         "Timeout joining finished test: %s (pid %d)"
543                         % (finished_testcase.last_test, finished_testcase.child.pid)
544                     )
545                 finished_testcase.close_pipes()
546                 wrapped_testcase_suites.remove(finished_testcase)
547                 finished_unread_testcases.add(finished_testcase)
548                 finished_testcase.stdouterr_queue.put(None)
549                 on_suite_finish(finished_testcase)
550                 if stop_run:
551                     while testcase_suites:
552                         results.append(TestResult(testcase_suites.pop(0)))
553                 elif testcase_suites:
554                     a_suite = testcase_suites.pop(0)
555                     while a_suite and a_suite.is_tagged_run_solo:
556                         solo_testcase_suites.append(a_suite)
557                         if testcase_suites:
558                             a_suite = testcase_suites.pop(0)
559                         else:
560                             a_suite = None
561                     if a_suite and can_run_suite(a_suite):
562                         run_suite(a_suite)
563                 if solo_testcase_suites and tests_running == 0:
564                     a_suite = solo_testcase_suites.pop(0)
565                     run_suite(a_suite)
566             time.sleep(0.1)
567     except Exception:
568         for wrapped_testcase_suite in wrapped_testcase_suites:
569             wrapped_testcase_suite.child.terminate()
570             wrapped_testcase_suite.stdouterr_queue.put(None)
571         raise
572     finally:
573         read_from_testcases.clear()
574         stdouterr_thread.join(config.timeout)
575         manager.shutdown()
576
577     handle_cores(failed_wrapped_testcases)
578     return results
579
580
581 class TestSuiteWrapper(unittest.TestSuite):
582     cpus_used = 0
583
584     def __init__(self):
585         return super().__init__()
586
587     def addTest(self, test):
588         self.cpus_used = max(self.cpus_used, test.get_cpus_required())
589         super().addTest(test)
590
591     def assign_cpus(self, cpus):
592         self.cpus = cpus
593
594     def _handleClassSetUp(self, test, result):
595         if not test.__class__.skipped_due_to_cpu_lack:
596             test.assign_cpus(self.cpus)
597         super()._handleClassSetUp(test, result)
598
599     def get_assigned_cpus(self):
600         return self.cpus
601
602
603 class SplitToSuitesCallback:
604     def __init__(self, filter_callback):
605         self.suites = {}
606         self.suite_name = "default"
607         self.filter_callback = filter_callback
608         self.filtered = TestSuiteWrapper()
609
610     def __call__(self, file_name, cls, method):
611         test_method = cls(method)
612         if self.filter_callback(file_name, cls.__name__, method):
613             self.suite_name = file_name + cls.__name__
614             if self.suite_name not in self.suites:
615                 self.suites[self.suite_name] = TestSuiteWrapper()
616                 self.suites[self.suite_name].is_tagged_run_solo = False
617             self.suites[self.suite_name].addTest(test_method)
618             if test_method.is_tagged_run_solo():
619                 self.suites[self.suite_name].is_tagged_run_solo = True
620
621         else:
622             self.filtered.addTest(test_method)
623
624
625 def parse_test_filter(test_filter):
626     f = test_filter
627     filter_file_name = None
628     filter_class_name = None
629     filter_func_name = None
630     if f:
631         if "." in f:
632             parts = f.split(".")
633             if len(parts) > 3:
634                 raise Exception(f"Invalid test filter: {test_filter}")
635             if len(parts) > 2:
636                 if parts[2] not in ("*", ""):
637                     filter_func_name = parts[2]
638             if parts[1] not in ("*", ""):
639                 filter_class_name = parts[1]
640             if parts[0] not in ("*", ""):
641                 if parts[0].startswith("test_"):
642                     filter_file_name = parts[0]
643                 else:
644                     filter_file_name = "test_%s" % parts[0]
645         else:
646             if f.startswith("test_"):
647                 filter_file_name = f
648             else:
649                 filter_file_name = "test_%s" % f
650     if filter_file_name:
651         filter_file_name = "%s.py" % filter_file_name
652     return filter_file_name, filter_class_name, filter_func_name
653
654
655 def filter_tests(tests, filter_cb):
656     result = TestSuiteWrapper()
657     for t in tests:
658         if isinstance(t, unittest.suite.TestSuite):
659             # this is a bunch of tests, recursively filter...
660             x = filter_tests(t, filter_cb)
661             if x.countTestCases() > 0:
662                 result.addTest(x)
663         elif isinstance(t, unittest.TestCase):
664             # this is a single test
665             parts = t.id().split(".")
666             # t.id() for common cases like this:
667             # test_classifier.TestClassifier.test_acl_ip
668             # apply filtering only if it is so
669             if len(parts) == 3:
670                 if not filter_cb(parts[0], parts[1], parts[2]):
671                     continue
672             result.addTest(t)
673         else:
674             # unexpected object, don't touch it
675             result.addTest(t)
676     return result
677
678
679 class FilterByTestOption:
680     def __init__(self, filters):
681         self.filters = filters
682
683     def __call__(self, file_name, class_name, func_name):
684         def test_one(
685             filter_file_name,
686             filter_class_name,
687             filter_func_name,
688             file_name,
689             class_name,
690             func_name,
691         ):
692             if filter_file_name:
693                 fn_match = fnmatch.fnmatch(file_name, filter_file_name)
694                 if not fn_match:
695                     return False
696             if filter_class_name and class_name != filter_class_name:
697                 return False
698             if filter_func_name and func_name != filter_func_name:
699                 return False
700             return True
701
702         for filter_file_name, filter_class_name, filter_func_name in self.filters:
703             if test_one(
704                 filter_file_name,
705                 filter_class_name,
706                 filter_func_name,
707                 file_name,
708                 class_name,
709                 func_name,
710             ):
711                 return True
712
713         return False
714
715
716 class FilterByClassList:
717     def __init__(self, classes_with_filenames):
718         self.classes_with_filenames = classes_with_filenames
719
720     def __call__(self, file_name, class_name, func_name):
721         return ".".join([file_name, class_name]) in self.classes_with_filenames
722
723
724 def suite_from_failed(suite, failed):
725     failed = {x.rsplit(".", 1)[0] for x in failed}
726     filter_cb = FilterByClassList(failed)
727     suite = filter_tests(suite, filter_cb)
728     return suite
729
730
731 class AllResults(dict):
732     def __init__(self):
733         super(AllResults, self).__init__()
734         self.all_testcases = 0
735         self.results_per_suite = []
736         self[PASS] = 0
737         self[FAIL] = 0
738         self[ERROR] = 0
739         self[SKIP] = 0
740         self[SKIP_CPU_SHORTAGE] = 0
741         self[TEST_RUN] = 0
742         self.rerun = []
743         self.testsuites_no_tests_run = []
744
745     def add_results(self, result):
746         self.results_per_suite.append(result)
747         result_types = [PASS, FAIL, ERROR, SKIP, TEST_RUN, SKIP_CPU_SHORTAGE]
748         for result_type in result_types:
749             self[result_type] += len(result[result_type])
750
751     def add_result(self, result):
752         retval = 0
753         self.all_testcases += result.testcase_suite.countTestCases()
754         self.add_results(result)
755
756         if result.no_tests_run():
757             self.testsuites_no_tests_run.append(result.testcase_suite)
758             if result.crashed:
759                 retval = -1
760             else:
761                 retval = 1
762         elif not result.was_successful():
763             retval = 1
764
765         if retval != 0:
766             self.rerun.append(result.testcase_suite)
767
768         return retval
769
770     def print_results(self):
771         print("")
772         print(double_line_delim)
773         print("TEST RESULTS:")
774
775         def indent_results(lines):
776             lines = list(filter(None, lines))
777             maximum = max(lines, key=lambda x: x.index(":"))
778             maximum = 4 + maximum.index(":")
779             for l in lines:
780                 padding = " " * (maximum - l.index(":"))
781                 print(f"{padding}{l}")
782
783         indent_results(
784             [
785                 f"Scheduled tests: {self.all_testcases}",
786                 f"Executed tests: {self[TEST_RUN]}",
787                 f"Passed tests: {colorize(self[PASS], GREEN)}",
788                 f"Skipped tests: {colorize(self[SKIP], YELLOW)}"
789                 if self[SKIP]
790                 else None,
791                 f"Not Executed tests: {colorize(self.not_executed, RED)}"
792                 if self.not_executed
793                 else None,
794                 f"Failures: {colorize(self[FAIL], RED)}" if self[FAIL] else None,
795                 f"Errors: {colorize(self[ERROR], RED)}" if self[ERROR] else None,
796                 "Tests skipped due to lack of CPUS: "
797                 f"{colorize(self[SKIP_CPU_SHORTAGE], YELLOW)}"
798                 if self[SKIP_CPU_SHORTAGE]
799                 else None,
800             ]
801         )
802
803         if self.all_failed > 0:
804             print("FAILURES AND ERRORS IN TESTS:")
805             for result in self.results_per_suite:
806                 failed_testcase_ids = result[FAIL]
807                 errored_testcase_ids = result[ERROR]
808                 old_testcase_name = None
809                 if failed_testcase_ids:
810                     for failed_test_id in failed_testcase_ids:
811                         new_testcase_name, test_name = result.get_testcase_names(
812                             failed_test_id
813                         )
814                         if new_testcase_name != old_testcase_name:
815                             print(
816                                 "  Testcase name: {}".format(
817                                     colorize(new_testcase_name, RED)
818                                 )
819                             )
820                             old_testcase_name = new_testcase_name
821                         print(
822                             "    FAILURE: {} [{}]".format(
823                                 colorize(test_name, RED), failed_test_id
824                             )
825                         )
826                 if errored_testcase_ids:
827                     for errored_test_id in errored_testcase_ids:
828                         new_testcase_name, test_name = result.get_testcase_names(
829                             errored_test_id
830                         )
831                         if new_testcase_name != old_testcase_name:
832                             print(
833                                 "  Testcase name: {}".format(
834                                     colorize(new_testcase_name, RED)
835                                 )
836                             )
837                             old_testcase_name = new_testcase_name
838                         print(
839                             "      ERROR: {} [{}]".format(
840                                 colorize(test_name, RED), errored_test_id
841                             )
842                         )
843         if self.testsuites_no_tests_run:
844             print("TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:")
845             tc_classes = set()
846             for testsuite in self.testsuites_no_tests_run:
847                 for testcase in testsuite:
848                     tc_classes.add(get_testcase_doc_name(testcase))
849             for tc_class in tc_classes:
850                 print("  {}".format(colorize(tc_class, RED)))
851
852         if self[SKIP_CPU_SHORTAGE]:
853             print()
854             print(
855                 colorize(
856                     "     SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT"
857                     " ENOUGH CPUS AVAILABLE",
858                     YELLOW,
859                 )
860             )
861         print(double_line_delim)
862         print("")
863
864     @property
865     def not_executed(self):
866         return self.all_testcases - self[TEST_RUN]
867
868     @property
869     def all_failed(self):
870         return self[FAIL] + self[ERROR]
871
872
873 def parse_results(results):
874     """
875     Prints the number of scheduled, executed, not executed, passed, failed,
876     errored and skipped tests and details about failed and errored tests.
877
878     Also returns all suites where any test failed.
879
880     :param results:
881     :return:
882     """
883
884     results_per_suite = AllResults()
885     crashed = False
886     failed = False
887     for result in results:
888         result_code = results_per_suite.add_result(result)
889         if result_code == 1:
890             failed = True
891         elif result_code == -1:
892             crashed = True
893
894     results_per_suite.print_results()
895
896     if crashed:
897         return_code = -1
898     elif failed:
899         return_code = 1
900     else:
901         return_code = 0
902     return return_code, results_per_suite.rerun
903
904
905 if __name__ == "__main__":
906
907     print(f"Config is: {config}")
908
909     if config.sanity:
910         print("Running sanity test case.")
911         try:
912             rc = sanity_run_vpp.main()
913             if rc != 0:
914                 sys.exit(rc)
915         except Exception as e:
916             print(traceback.format_exc())
917             print("Couldn't run sanity test case.")
918             sys.exit(-1)
919
920     test_finished_join_timeout = 15
921
922     debug_gdb = config.debug in ["gdb", "gdbserver", "attach"]
923     debug_core = config.debug == "core"
924
925     run_interactive = debug_gdb or config.step or config.force_foreground
926
927     max_concurrent_tests = 0
928     print(f"OS reports {num_cpus} available cpu(s).")
929
930     test_jobs = config.jobs
931     if test_jobs == "auto":
932         if run_interactive:
933             max_concurrent_tests = 1
934             print("Interactive mode required, running tests consecutively.")
935         else:
936             max_concurrent_tests = num_cpus
937             print(
938                 f"Running at most {max_concurrent_tests} python test "
939                 "processes concurrently."
940             )
941     else:
942         max_concurrent_tests = test_jobs
943         print(
944             f"Running at most {max_concurrent_tests} python test processes "
945             "concurrently as set by 'TEST_JOBS'."
946         )
947
948     print(f"Using at most {max_vpp_cpus} cpus for VPP threads.")
949
950     if run_interactive and max_concurrent_tests > 1:
951         raise NotImplementedError(
952             "Running tests interactively (DEBUG is gdb[server] or ATTACH or "
953             "STEP is set) in parallel (TEST_JOBS is more than 1) is not "
954             "supported"
955         )
956
957     descriptions = True
958
959     print("Running tests using custom test runner.")
960     filters = [(parse_test_filter(f)) for f in config.filter.split(",")]
961
962     print(
963         "Selected filters: ",
964         "|".join(
965             f"file={filter_file}, class={filter_class}, function={filter_func}"
966             for filter_file, filter_class, filter_func in filters
967         ),
968     )
969
970     filter_cb = FilterByTestOption(filters)
971
972     cb = SplitToSuitesCallback(filter_cb)
973     for d in config.test_src_dir:
974         print("Adding tests from directory tree %s" % d)
975         discover_tests(d, cb)
976
977     # suites are not hashable, need to use list
978     suites = []
979     tests_amount = 0
980     for testcase_suite in cb.suites.values():
981         tests_amount += testcase_suite.countTestCases()
982         if testcase_suite.cpus_used > max_vpp_cpus:
983             # here we replace test functions with lambdas to just skip them
984             # but we also replace setUp/tearDown functions to do nothing
985             # so that the test can be "started" and "stopped", so that we can
986             # still keep those prints (test description - SKIP), which are done
987             # in stopTest() (for that to trigger, test function must run)
988             for t in testcase_suite:
989                 for m in dir(t):
990                     if m.startswith("test_"):
991                         setattr(t, m, lambda: t.skipTest("not enough cpus"))
992                 setattr(t.__class__, "setUpClass", lambda: None)
993                 setattr(t.__class__, "tearDownClass", lambda: None)
994                 setattr(t, "setUp", lambda: None)
995                 setattr(t, "tearDown", lambda: None)
996                 t.__class__.skipped_due_to_cpu_lack = True
997         suites.append(testcase_suite)
998
999     print(
1000         "%s out of %s tests match specified filters"
1001         % (tests_amount, tests_amount + cb.filtered.countTestCases())
1002     )
1003
1004     if not config.extended:
1005         print("Not running extended tests (some tests will be skipped)")
1006
1007     attempts = config.retries + 1
1008     if attempts > 1:
1009         print("Perform %s attempts to pass the suite..." % attempts)
1010
1011     if run_interactive and suites:
1012         # don't fork if requiring interactive terminal
1013         print("Running tests in foreground in the current process")
1014         full_suite = unittest.TestSuite()
1015         free_cpus = list(available_cpus)
1016         cpu_shortage = False
1017         for suite in suites:
1018             if suite.cpus_used <= max_vpp_cpus:
1019                 suite.assign_cpus(free_cpus[: suite.cpus_used])
1020             else:
1021                 suite.assign_cpus([])
1022                 cpu_shortage = True
1023         full_suite.addTests(suites)
1024         result = VppTestRunner(
1025             verbosity=config.verbose, failfast=config.failfast, print_summary=True
1026         ).run(full_suite)
1027         was_successful = result.wasSuccessful()
1028         if not was_successful:
1029             for test_case_info in result.failed_test_cases_info:
1030                 handle_failed_suite(
1031                     test_case_info.logger,
1032                     test_case_info.tempdir,
1033                     test_case_info.vpp_pid,
1034                     config.vpp,
1035                 )
1036                 if test_case_info in result.core_crash_test_cases_info:
1037                     check_and_handle_core(
1038                         test_case_info.vpp_bin_path,
1039                         test_case_info.tempdir,
1040                         test_case_info.core_crash_test,
1041                     )
1042
1043         if cpu_shortage:
1044             print()
1045             print(
1046                 colorize(
1047                     "SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT"
1048                     " ENOUGH CPUS AVAILABLE",
1049                     YELLOW,
1050                 )
1051             )
1052             print()
1053         sys.exit(not was_successful)
1054     else:
1055         print(
1056             "Running each VPPTestCase in a separate background process"
1057             f" with at most {max_concurrent_tests} parallel python test "
1058             "process(es)"
1059         )
1060         exit_code = 0
1061         while suites and attempts > 0:
1062             results = run_forked(suites)
1063             exit_code, suites = parse_results(results)
1064             attempts -= 1
1065             if exit_code == 0:
1066                 print("Test run was successful")
1067             else:
1068                 print("%s attempt(s) left." % attempts)
1069         sys.exit(exit_code)