tests: make tests less make dependent
[vpp.git] / test / run_tests.py
1 #!/usr/bin/env python3
2
3 import sys
4 import shutil
5 import os
6 import fnmatch
7 import unittest
8 import argparse
9 import time
10 import threading
11 import traceback
12 import signal
13 import re
14 from multiprocessing import Process, Pipe, get_context
15 from multiprocessing.queues import Queue
16 from multiprocessing.managers import BaseManager
17 import framework
18 from config import config, num_cpus, available_cpus, max_vpp_cpus
19 from framework import VppTestRunner, VppTestCase, \
20     get_testcase_doc_name, get_test_description, PASS, FAIL, ERROR, SKIP, \
21     TEST_RUN, SKIP_CPU_SHORTAGE
22 from debug import spawn_gdb, start_vpp_in_gdb
23 from log import get_parallel_logger, double_line_delim, RED, YELLOW, GREEN, \
24     colorize, single_line_delim
25 from discover_tests import discover_tests
26 import sanity_run_vpp
27 from subprocess import check_output, CalledProcessError
28 from util import check_core_path, get_core_path, is_core_present
29
30 # timeout which controls how long the child has to finish after seeing
31 # a core dump in test temporary directory. If this is exceeded, parent assumes
32 # that child process is stuck (e.g. waiting for event from vpp) and kill
33 # the child
34 core_timeout = 3
35
36
37 class StreamQueue(Queue):
38     def write(self, msg):
39         self.put(msg)
40
41     def flush(self):
42         sys.__stdout__.flush()
43         sys.__stderr__.flush()
44
45     def fileno(self):
46         return self._writer.fileno()
47
48
49 class StreamQueueManager(BaseManager):
50     pass
51
52
53 StreamQueueManager.register('StreamQueue', StreamQueue)
54
55
56 class TestResult(dict):
57     def __init__(self, testcase_suite, testcases_by_id=None):
58         super(TestResult, self).__init__()
59         self[PASS] = []
60         self[FAIL] = []
61         self[ERROR] = []
62         self[SKIP] = []
63         self[SKIP_CPU_SHORTAGE] = []
64         self[TEST_RUN] = []
65         self.crashed = False
66         self.testcase_suite = testcase_suite
67         self.testcases = [testcase for testcase in testcase_suite]
68         self.testcases_by_id = testcases_by_id
69
70     def was_successful(self):
71         return 0 == len(self[FAIL]) == len(self[ERROR]) \
72             and len(self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE]) \
73             == self.testcase_suite.countTestCases()
74
75     def no_tests_run(self):
76         return 0 == len(self[TEST_RUN])
77
78     def process_result(self, test_id, result):
79         self[result].append(test_id)
80
81     def suite_from_failed(self):
82         rerun_ids = set([])
83         for testcase in self.testcase_suite:
84             tc_id = testcase.id()
85             if tc_id not in self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE]:
86                 rerun_ids.add(tc_id)
87         if rerun_ids:
88             return suite_from_failed(self.testcase_suite, rerun_ids)
89
90     def get_testcase_names(self, test_id):
91         # could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
92         setup_teardown_match = re.match(
93             r'((tearDownClass)|(setUpClass)) \((.+\..+)\)', test_id)
94         if setup_teardown_match:
95             test_name, _, _, testcase_name = setup_teardown_match.groups()
96             if len(testcase_name.split('.')) == 2:
97                 for key in self.testcases_by_id.keys():
98                     if key.startswith(testcase_name):
99                         testcase_name = key
100                         break
101             testcase_name = self._get_testcase_doc_name(testcase_name)
102         else:
103             test_name = self._get_test_description(test_id)
104             testcase_name = self._get_testcase_doc_name(test_id)
105
106         return testcase_name, test_name
107
108     def _get_test_description(self, test_id):
109         if test_id in self.testcases_by_id:
110             desc = get_test_description(descriptions,
111                                         self.testcases_by_id[test_id])
112         else:
113             desc = test_id
114         return desc
115
116     def _get_testcase_doc_name(self, test_id):
117         if test_id in self.testcases_by_id:
118             doc_name = get_testcase_doc_name(self.testcases_by_id[test_id])
119         else:
120             doc_name = test_id
121         return doc_name
122
123
124 def test_runner_wrapper(suite, keep_alive_pipe, stdouterr_queue,
125                         finished_pipe, result_pipe, logger):
126     sys.stdout = stdouterr_queue
127     sys.stderr = stdouterr_queue
128     VppTestCase.parallel_handler = logger.handlers[0]
129     result = VppTestRunner(keep_alive_pipe=keep_alive_pipe,
130                            descriptions=descriptions,
131                            verbosity=config.verbose,
132                            result_pipe=result_pipe,
133                            failfast=config.failfast,
134                            print_summary=False).run(suite)
135     finished_pipe.send(result.wasSuccessful())
136     finished_pipe.close()
137     keep_alive_pipe.close()
138
139
140 class TestCaseWrapper(object):
141     def __init__(self, testcase_suite, manager):
142         self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(
143             duplex=False)
144         self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
145         self.result_parent_end, self.result_child_end = Pipe(duplex=False)
146         self.testcase_suite = testcase_suite
147         self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
148         self.logger = get_parallel_logger(self.stdouterr_queue)
149         self.child = Process(target=test_runner_wrapper,
150                              args=(testcase_suite,
151                                    self.keep_alive_child_end,
152                                    self.stdouterr_queue,
153                                    self.finished_child_end,
154                                    self.result_child_end,
155                                    self.logger)
156                              )
157         self.child.start()
158         self.last_test_temp_dir = None
159         self.last_test_vpp_binary = None
160         self._last_test = None
161         self.last_test_id = None
162         self.vpp_pid = None
163         self.last_heard = time.time()
164         self.core_detected_at = None
165         self.testcases_by_id = {}
166         self.testclasess_with_core = {}
167         for testcase in self.testcase_suite:
168             self.testcases_by_id[testcase.id()] = testcase
169         self.result = TestResult(testcase_suite, self.testcases_by_id)
170
171     @property
172     def last_test(self):
173         return self._last_test
174
175     @last_test.setter
176     def last_test(self, test_id):
177         self.last_test_id = test_id
178         if test_id in self.testcases_by_id:
179             testcase = self.testcases_by_id[test_id]
180             self._last_test = testcase.shortDescription()
181             if not self._last_test:
182                 self._last_test = str(testcase)
183         else:
184             self._last_test = test_id
185
186     def add_testclass_with_core(self):
187         if self.last_test_id in self.testcases_by_id:
188             test = self.testcases_by_id[self.last_test_id]
189             class_name = unittest.util.strclass(test.__class__)
190             test_name = "'{}' ({})".format(get_test_description(descriptions,
191                                                                 test),
192                                            self.last_test_id)
193         else:
194             test_name = self.last_test_id
195             class_name = re.match(r'((tearDownClass)|(setUpClass)) '
196                                   r'\((.+\..+)\)', test_name).groups()[3]
197         if class_name not in self.testclasess_with_core:
198             self.testclasess_with_core[class_name] = (
199                 test_name,
200                 self.last_test_vpp_binary,
201                 self.last_test_temp_dir)
202
203     def close_pipes(self):
204         self.keep_alive_child_end.close()
205         self.finished_child_end.close()
206         self.result_child_end.close()
207         self.keep_alive_parent_end.close()
208         self.finished_parent_end.close()
209         self.result_parent_end.close()
210
211     def was_successful(self):
212         return self.result.was_successful()
213
214     @property
215     def cpus_used(self):
216         return self.testcase_suite.cpus_used
217
218     def get_assigned_cpus(self):
219         return self.testcase_suite.get_assigned_cpus()
220
221
222 def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases,
223                              read_testcases):
224     read_testcase = None
225     while read_testcases.is_set() or unread_testcases:
226         if finished_unread_testcases:
227             read_testcase = finished_unread_testcases.pop()
228             unread_testcases.remove(read_testcase)
229         elif unread_testcases:
230             read_testcase = unread_testcases.pop()
231         if read_testcase:
232             data = ''
233             while data is not None:
234                 sys.stdout.write(data)
235                 data = read_testcase.stdouterr_queue.get()
236
237             read_testcase.stdouterr_queue.close()
238             finished_unread_testcases.discard(read_testcase)
239             read_testcase = None
240
241
242 def handle_failed_suite(logger, last_test_temp_dir, vpp_pid):
243     if last_test_temp_dir:
244         # Need to create link in case of a timeout or core dump without failure
245         lttd = os.path.basename(last_test_temp_dir)
246         link_path = '%s%s-FAILED' % (config.failed_dir, lttd)
247         if not os.path.exists(link_path):
248             os.symlink(last_test_temp_dir, link_path)
249         logger.error("Symlink to failed testcase directory: %s -> %s"
250                      % (link_path, lttd))
251
252         # Report core existence
253         core_path = get_core_path(last_test_temp_dir)
254         if os.path.exists(core_path):
255             logger.error(
256                 "Core-file exists in test temporary directory: %s!" %
257                 core_path)
258             check_core_path(logger, core_path)
259             logger.debug("Running 'file %s':" % core_path)
260             try:
261                 info = check_output(["file", core_path])
262                 logger.debug(info)
263             except CalledProcessError as e:
264                 logger.error("Subprocess returned with return code "
265                              "while running `file' utility on core-file "
266                              "returned: "
267                              "rc=%s", e.returncode)
268             except OSError as e:
269                 logger.error("Subprocess returned with OS error while "
270                              "running 'file' utility "
271                              "on core-file: "
272                              "(%s) %s", e.errno, e.strerror)
273             except Exception as e:
274                 logger.exception("Unexpected error running `file' utility "
275                                  "on core-file")
276             logger.error(f"gdb {config.vpp_bin} {core_path}")
277
278     if vpp_pid:
279         # Copy api post mortem
280         api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
281         if os.path.isfile(api_post_mortem_path):
282             logger.error("Copying api_post_mortem.%d to %s" %
283                          (vpp_pid, last_test_temp_dir))
284             shutil.copy2(api_post_mortem_path, last_test_temp_dir)
285
286
287 def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
288     if is_core_present(tempdir):
289         if debug_core:
290             print('VPP core detected in %s. Last test running was %s' %
291                   (tempdir, core_crash_test))
292             print(single_line_delim)
293             spawn_gdb(vpp_binary, get_core_path(tempdir))
294             print(single_line_delim)
295         elif config.compress_core:
296             print("Compressing core-file in test directory `%s'" % tempdir)
297             os.system("gzip %s" % get_core_path(tempdir))
298
299
300 def handle_cores(failed_testcases):
301     for failed_testcase in failed_testcases:
302         tcs_with_core = failed_testcase.testclasess_with_core
303         if tcs_with_core:
304             for test, vpp_binary, tempdir in tcs_with_core.values():
305                 check_and_handle_core(vpp_binary, tempdir, test)
306
307
308 def process_finished_testsuite(wrapped_testcase_suite,
309                                finished_testcase_suites,
310                                failed_wrapped_testcases,
311                                results):
312     results.append(wrapped_testcase_suite.result)
313     finished_testcase_suites.add(wrapped_testcase_suite)
314     stop_run = False
315     if config.failfast and not wrapped_testcase_suite.was_successful():
316         stop_run = True
317
318     if not wrapped_testcase_suite.was_successful():
319         failed_wrapped_testcases.add(wrapped_testcase_suite)
320         handle_failed_suite(wrapped_testcase_suite.logger,
321                             wrapped_testcase_suite.last_test_temp_dir,
322                             wrapped_testcase_suite.vpp_pid)
323
324     return stop_run
325
326
327 def run_forked(testcase_suites):
328     wrapped_testcase_suites = set()
329     solo_testcase_suites = []
330
331     # suites are unhashable, need to use list
332     results = []
333     unread_testcases = set()
334     finished_unread_testcases = set()
335     manager = StreamQueueManager()
336     manager.start()
337     tests_running = 0
338     free_cpus = list(available_cpus)
339
340     def on_suite_start(tc):
341         nonlocal tests_running
342         nonlocal free_cpus
343         tests_running = tests_running + 1
344
345     def on_suite_finish(tc):
346         nonlocal tests_running
347         nonlocal free_cpus
348         tests_running = tests_running - 1
349         assert tests_running >= 0
350         free_cpus.extend(tc.get_assigned_cpus())
351
352     def run_suite(suite):
353         nonlocal manager
354         nonlocal wrapped_testcase_suites
355         nonlocal unread_testcases
356         nonlocal free_cpus
357         suite.assign_cpus(free_cpus[:suite.cpus_used])
358         free_cpus = free_cpus[suite.cpus_used:]
359         wrapper = TestCaseWrapper(suite, manager)
360         wrapped_testcase_suites.add(wrapper)
361         unread_testcases.add(wrapper)
362         on_suite_start(suite)
363
364     def can_run_suite(suite):
365         return (tests_running < max_concurrent_tests and
366                 (suite.cpus_used <= len(free_cpus) or
367                  suite.cpus_used > max_vpp_cpus))
368
369     while free_cpus and testcase_suites:
370         a_suite = testcase_suites[0]
371         if a_suite.is_tagged_run_solo:
372             a_suite = testcase_suites.pop(0)
373             solo_testcase_suites.append(a_suite)
374             continue
375         if can_run_suite(a_suite):
376             a_suite = testcase_suites.pop(0)
377             run_suite(a_suite)
378         else:
379             break
380
381     if tests_running == 0 and solo_testcase_suites:
382         a_suite = solo_testcase_suites.pop(0)
383         run_suite(a_suite)
384
385     read_from_testcases = threading.Event()
386     read_from_testcases.set()
387     stdouterr_thread = threading.Thread(target=stdouterr_reader_wrapper,
388                                         args=(unread_testcases,
389                                               finished_unread_testcases,
390                                               read_from_testcases))
391     stdouterr_thread.start()
392
393     failed_wrapped_testcases = set()
394     stop_run = False
395
396     try:
397         while wrapped_testcase_suites:
398             finished_testcase_suites = set()
399             for wrapped_testcase_suite in wrapped_testcase_suites:
400                 while wrapped_testcase_suite.result_parent_end.poll():
401                     wrapped_testcase_suite.result.process_result(
402                         *wrapped_testcase_suite.result_parent_end.recv())
403                     wrapped_testcase_suite.last_heard = time.time()
404
405                 while wrapped_testcase_suite.keep_alive_parent_end.poll():
406                     wrapped_testcase_suite.last_test, \
407                         wrapped_testcase_suite.last_test_vpp_binary, \
408                         wrapped_testcase_suite.last_test_temp_dir, \
409                         wrapped_testcase_suite.vpp_pid = \
410                         wrapped_testcase_suite.keep_alive_parent_end.recv()
411                     wrapped_testcase_suite.last_heard = time.time()
412
413                 if wrapped_testcase_suite.finished_parent_end.poll():
414                     wrapped_testcase_suite.finished_parent_end.recv()
415                     wrapped_testcase_suite.last_heard = time.time()
416                     stop_run = process_finished_testsuite(
417                         wrapped_testcase_suite,
418                         finished_testcase_suites,
419                         failed_wrapped_testcases,
420                         results) or stop_run
421                     continue
422
423                 fail = False
424                 if wrapped_testcase_suite.last_heard + config.timeout < \
425                         time.time():
426                     fail = True
427                     wrapped_testcase_suite.logger.critical(
428                         "Child test runner process timed out "
429                         "(last test running was `%s' in `%s')!" %
430                         (wrapped_testcase_suite.last_test,
431                          wrapped_testcase_suite.last_test_temp_dir))
432                 elif not wrapped_testcase_suite.child.is_alive():
433                     fail = True
434                     wrapped_testcase_suite.logger.critical(
435                         "Child test runner process unexpectedly died "
436                         "(last test running was `%s' in `%s')!" %
437                         (wrapped_testcase_suite.last_test,
438                          wrapped_testcase_suite.last_test_temp_dir))
439                 elif wrapped_testcase_suite.last_test_temp_dir and \
440                         wrapped_testcase_suite.last_test_vpp_binary:
441                     if is_core_present(
442                             wrapped_testcase_suite.last_test_temp_dir):
443                         wrapped_testcase_suite.add_testclass_with_core()
444                         if wrapped_testcase_suite.core_detected_at is None:
445                             wrapped_testcase_suite.core_detected_at = \
446                                 time.time()
447                         elif wrapped_testcase_suite.core_detected_at + \
448                                 core_timeout < time.time():
449                             wrapped_testcase_suite.logger.critical(
450                                 "Child test runner process unresponsive and "
451                                 "core-file exists in test temporary directory "
452                                 "(last test running was `%s' in `%s')!" %
453                                 (wrapped_testcase_suite.last_test,
454                                  wrapped_testcase_suite.last_test_temp_dir))
455                             fail = True
456
457                 if fail:
458                     wrapped_testcase_suite.child.terminate()
459                     try:
460                         # terminating the child process tends to leave orphan
461                         # VPP process around
462                         if wrapped_testcase_suite.vpp_pid:
463                             os.kill(wrapped_testcase_suite.vpp_pid,
464                                     signal.SIGTERM)
465                     except OSError:
466                         # already dead
467                         pass
468                     wrapped_testcase_suite.result.crashed = True
469                     wrapped_testcase_suite.result.process_result(
470                         wrapped_testcase_suite.last_test_id, ERROR)
471                     stop_run = process_finished_testsuite(
472                         wrapped_testcase_suite,
473                         finished_testcase_suites,
474                         failed_wrapped_testcases,
475                         results) or stop_run
476
477             for finished_testcase in finished_testcase_suites:
478                 # Somewhat surprisingly, the join below may
479                 # timeout, even if client signaled that
480                 # it finished - so we note it just in case.
481                 join_start = time.time()
482                 finished_testcase.child.join(test_finished_join_timeout)
483                 join_end = time.time()
484                 if join_end - join_start >= test_finished_join_timeout:
485                     finished_testcase.logger.error(
486                         "Timeout joining finished test: %s (pid %d)" %
487                         (finished_testcase.last_test,
488                          finished_testcase.child.pid))
489                 finished_testcase.close_pipes()
490                 wrapped_testcase_suites.remove(finished_testcase)
491                 finished_unread_testcases.add(finished_testcase)
492                 finished_testcase.stdouterr_queue.put(None)
493                 on_suite_finish(finished_testcase)
494                 if stop_run:
495                     while testcase_suites:
496                         results.append(TestResult(testcase_suites.pop(0)))
497                 elif testcase_suites:
498                     a_suite = testcase_suites.pop(0)
499                     while a_suite and a_suite.is_tagged_run_solo:
500                         solo_testcase_suites.append(a_suite)
501                         if testcase_suites:
502                             a_suite = testcase_suites.pop(0)
503                         else:
504                             a_suite = None
505                     if a_suite and can_run_suite(a_suite):
506                         run_suite(a_suite)
507                 if solo_testcase_suites and tests_running == 0:
508                     a_suite = solo_testcase_suites.pop(0)
509                     run_suite(a_suite)
510             time.sleep(0.1)
511     except Exception:
512         for wrapped_testcase_suite in wrapped_testcase_suites:
513             wrapped_testcase_suite.child.terminate()
514             wrapped_testcase_suite.stdouterr_queue.put(None)
515         raise
516     finally:
517         read_from_testcases.clear()
518         stdouterr_thread.join(config.timeout)
519         manager.shutdown()
520
521     handle_cores(failed_wrapped_testcases)
522     return results
523
524
525 class TestSuiteWrapper(unittest.TestSuite):
526     cpus_used = 0
527
528     def __init__(self):
529         return super().__init__()
530
531     def addTest(self, test):
532         self.cpus_used = max(self.cpus_used, test.get_cpus_required())
533         super().addTest(test)
534
535     def assign_cpus(self, cpus):
536         self.cpus = cpus
537
538     def _handleClassSetUp(self, test, result):
539         if not test.__class__.skipped_due_to_cpu_lack:
540             test.assign_cpus(self.cpus)
541         super()._handleClassSetUp(test, result)
542
543     def get_assigned_cpus(self):
544         return self.cpus
545
546
547 class SplitToSuitesCallback:
548     def __init__(self, filter_callback):
549         self.suites = {}
550         self.suite_name = 'default'
551         self.filter_callback = filter_callback
552         self.filtered = TestSuiteWrapper()
553
554     def __call__(self, file_name, cls, method):
555         test_method = cls(method)
556         if self.filter_callback(file_name, cls.__name__, method):
557             self.suite_name = file_name + cls.__name__
558             if self.suite_name not in self.suites:
559                 self.suites[self.suite_name] = TestSuiteWrapper()
560                 self.suites[self.suite_name].is_tagged_run_solo = False
561             self.suites[self.suite_name].addTest(test_method)
562             if test_method.is_tagged_run_solo():
563                 self.suites[self.suite_name].is_tagged_run_solo = True
564
565         else:
566             self.filtered.addTest(test_method)
567
568
569 def parse_test_filter(test_filter):
570     f = test_filter
571     filter_file_name = None
572     filter_class_name = None
573     filter_func_name = None
574     if f:
575         if '.' in f:
576             parts = f.split('.')
577             if len(parts) > 3:
578                 raise Exception("Unrecognized %s option: %s" %
579                                 (test_option, f))
580             if len(parts) > 2:
581                 if parts[2] not in ('*', ''):
582                     filter_func_name = parts[2]
583             if parts[1] not in ('*', ''):
584                 filter_class_name = parts[1]
585             if parts[0] not in ('*', ''):
586                 if parts[0].startswith('test_'):
587                     filter_file_name = parts[0]
588                 else:
589                     filter_file_name = 'test_%s' % parts[0]
590         else:
591             if f.startswith('test_'):
592                 filter_file_name = f
593             else:
594                 filter_file_name = 'test_%s' % f
595     if filter_file_name:
596         filter_file_name = '%s.py' % filter_file_name
597     return filter_file_name, filter_class_name, filter_func_name
598
599
600 def filter_tests(tests, filter_cb):
601     result = TestSuiteWrapper()
602     for t in tests:
603         if isinstance(t, unittest.suite.TestSuite):
604             # this is a bunch of tests, recursively filter...
605             x = filter_tests(t, filter_cb)
606             if x.countTestCases() > 0:
607                 result.addTest(x)
608         elif isinstance(t, unittest.TestCase):
609             # this is a single test
610             parts = t.id().split('.')
611             # t.id() for common cases like this:
612             # test_classifier.TestClassifier.test_acl_ip
613             # apply filtering only if it is so
614             if len(parts) == 3:
615                 if not filter_cb(parts[0], parts[1], parts[2]):
616                     continue
617             result.addTest(t)
618         else:
619             # unexpected object, don't touch it
620             result.addTest(t)
621     return result
622
623
624 class FilterByTestOption:
625     def __init__(self, filter_file_name, filter_class_name, filter_func_name):
626         self.filter_file_name = filter_file_name
627         self.filter_class_name = filter_class_name
628         self.filter_func_name = filter_func_name
629
630     def __call__(self, file_name, class_name, func_name):
631         if self.filter_file_name:
632             fn_match = fnmatch.fnmatch(file_name, self.filter_file_name)
633             if not fn_match:
634                 return False
635         if self.filter_class_name and class_name != self.filter_class_name:
636             return False
637         if self.filter_func_name and func_name != self.filter_func_name:
638             return False
639         return True
640
641
642 class FilterByClassList:
643     def __init__(self, classes_with_filenames):
644         self.classes_with_filenames = classes_with_filenames
645
646     def __call__(self, file_name, class_name, func_name):
647         return '.'.join([file_name, class_name]) in self.classes_with_filenames
648
649
650 def suite_from_failed(suite, failed):
651     failed = {x.rsplit('.', 1)[0] for x in failed}
652     filter_cb = FilterByClassList(failed)
653     suite = filter_tests(suite, filter_cb)
654     return suite
655
656
657 class AllResults(dict):
658     def __init__(self):
659         super(AllResults, self).__init__()
660         self.all_testcases = 0
661         self.results_per_suite = []
662         self[PASS] = 0
663         self[FAIL] = 0
664         self[ERROR] = 0
665         self[SKIP] = 0
666         self[SKIP_CPU_SHORTAGE] = 0
667         self[TEST_RUN] = 0
668         self.rerun = []
669         self.testsuites_no_tests_run = []
670
671     def add_results(self, result):
672         self.results_per_suite.append(result)
673         result_types = [PASS, FAIL, ERROR, SKIP, TEST_RUN, SKIP_CPU_SHORTAGE]
674         for result_type in result_types:
675             self[result_type] += len(result[result_type])
676
677     def add_result(self, result):
678         retval = 0
679         self.all_testcases += result.testcase_suite.countTestCases()
680         self.add_results(result)
681
682         if result.no_tests_run():
683             self.testsuites_no_tests_run.append(result.testcase_suite)
684             if result.crashed:
685                 retval = -1
686             else:
687                 retval = 1
688         elif not result.was_successful():
689             retval = 1
690
691         if retval != 0:
692             self.rerun.append(result.testcase_suite)
693
694         return retval
695
696     def print_results(self):
697         print('')
698         print(double_line_delim)
699         print('TEST RESULTS:')
700
701         def indent_results(lines):
702             lines = list(filter(None, lines))
703             maximum = max(lines, key=lambda x: x.index(":"))
704             maximum = 4 + maximum.index(":")
705             for l in lines:
706                 padding = " " * (maximum - l.index(":"))
707                 print(f"{padding}{l}")
708
709         indent_results([
710             f'Scheduled tests: {self.all_testcases}',
711             f'Executed tests: {self[TEST_RUN]}',
712             f'Passed tests: {colorize(self[PASS], GREEN)}',
713             f'Skipped tests: {colorize(self[SKIP], YELLOW)}'
714             if self[SKIP] else None,
715             f'Not Executed tests: {colorize(self.not_executed, RED)}'
716             if self.not_executed else None,
717             f'Failures: {colorize(self[FAIL], RED)}' if self[FAIL] else None,
718             f'Errors: {colorize(self[ERROR], RED)}' if self[ERROR] else None,
719             'Tests skipped due to lack of CPUS: '
720             f'{colorize(self[SKIP_CPU_SHORTAGE], YELLOW)}'
721             if self[SKIP_CPU_SHORTAGE] else None
722         ])
723
724         if self.all_failed > 0:
725             print('FAILURES AND ERRORS IN TESTS:')
726             for result in self.results_per_suite:
727                 failed_testcase_ids = result[FAIL]
728                 errored_testcase_ids = result[ERROR]
729                 old_testcase_name = None
730                 if failed_testcase_ids:
731                     for failed_test_id in failed_testcase_ids:
732                         new_testcase_name, test_name = \
733                             result.get_testcase_names(failed_test_id)
734                         if new_testcase_name != old_testcase_name:
735                             print('  Testcase name: {}'.format(
736                                 colorize(new_testcase_name, RED)))
737                             old_testcase_name = new_testcase_name
738                         print('    FAILURE: {} [{}]'.format(
739                             colorize(test_name, RED), failed_test_id))
740                 if errored_testcase_ids:
741                     for errored_test_id in errored_testcase_ids:
742                         new_testcase_name, test_name = \
743                             result.get_testcase_names(errored_test_id)
744                         if new_testcase_name != old_testcase_name:
745                             print('  Testcase name: {}'.format(
746                                 colorize(new_testcase_name, RED)))
747                             old_testcase_name = new_testcase_name
748                         print('      ERROR: {} [{}]'.format(
749                             colorize(test_name, RED), errored_test_id))
750         if self.testsuites_no_tests_run:
751             print('TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:')
752             tc_classes = set()
753             for testsuite in self.testsuites_no_tests_run:
754                 for testcase in testsuite:
755                     tc_classes.add(get_testcase_doc_name(testcase))
756             for tc_class in tc_classes:
757                 print('  {}'.format(colorize(tc_class, RED)))
758
759         if self[SKIP_CPU_SHORTAGE]:
760             print()
761             print(colorize('     SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT'
762                            ' ENOUGH CPUS AVAILABLE', YELLOW))
763         print(double_line_delim)
764         print('')
765
766     @property
767     def not_executed(self):
768         return self.all_testcases - self[TEST_RUN]
769
770     @property
771     def all_failed(self):
772         return self[FAIL] + self[ERROR]
773
774
775 def parse_results(results):
776     """
777     Prints the number of scheduled, executed, not executed, passed, failed,
778     errored and skipped tests and details about failed and errored tests.
779
780     Also returns all suites where any test failed.
781
782     :param results:
783     :return:
784     """
785
786     results_per_suite = AllResults()
787     crashed = False
788     failed = False
789     for result in results:
790         result_code = results_per_suite.add_result(result)
791         if result_code == 1:
792             failed = True
793         elif result_code == -1:
794             crashed = True
795
796     results_per_suite.print_results()
797
798     if crashed:
799         return_code = -1
800     elif failed:
801         return_code = 1
802     else:
803         return_code = 0
804     return return_code, results_per_suite.rerun
805
806
807 if __name__ == '__main__':
808
809     print(f"Config is: {config}")
810
811     if config.sanity:
812         print("Running sanity test case.")
813         try:
814             rc = sanity_run_vpp.main()
815             if rc != 0:
816                 sys.exit(rc)
817         except Exception as e:
818             print(traceback.format_exc())
819             print("Couldn't run sanity test case.")
820             sys.exit(-1)
821
822     test_finished_join_timeout = 15
823
824     debug_gdb = config.debug in ["gdb", "gdbserver", "attach"]
825     debug_core = config.debug == "core"
826
827     run_interactive = debug_gdb or config.step or config.force_foreground
828
829     max_concurrent_tests = 0
830     print(f"OS reports {num_cpus} available cpu(s).")
831
832     test_jobs = config.jobs
833     if test_jobs == 'auto':
834         if run_interactive:
835             max_concurrent_tests = 1
836             print('Interactive mode required, running tests consecutively.')
837         else:
838             max_concurrent_tests = num_cpus
839             print(f"Running at most {max_concurrent_tests} python test "
840                   "processes concurrently.")
841     else:
842         max_concurrent_tests = test_jobs
843         print(f"Running at most {max_concurrent_tests} python test processes "
844               "concurrently as set by 'TEST_JOBS'.")
845
846     print(f"Using at most {max_vpp_cpus} cpus for VPP threads.")
847
848     if run_interactive and max_concurrent_tests > 1:
849         raise NotImplementedError(
850             'Running tests interactively (DEBUG is gdb[server] or ATTACH or '
851             'STEP is set) in parallel (TEST_JOBS is more than 1) is not '
852             'supported')
853
854     descriptions = True
855
856     print("Running tests using custom test runner.")
857     filter_file, filter_class, filter_func = \
858         parse_test_filter(config.filter)
859
860     print("Selected filters: file=%s, class=%s, function=%s" % (
861         filter_file, filter_class, filter_func))
862
863     filter_cb = FilterByTestOption(filter_file, filter_class, filter_func)
864
865     ignore_path = config.venv_dir
866     cb = SplitToSuitesCallback(filter_cb)
867     for d in config.test_src_dir:
868         print("Adding tests from directory tree %s" % d)
869         discover_tests(d, cb, ignore_path)
870
871     # suites are not hashable, need to use list
872     suites = []
873     tests_amount = 0
874     for testcase_suite in cb.suites.values():
875         tests_amount += testcase_suite.countTestCases()
876         if testcase_suite.cpus_used > max_vpp_cpus:
877             # here we replace test functions with lambdas to just skip them
878             # but we also replace setUp/tearDown functions to do nothing
879             # so that the test can be "started" and "stopped", so that we can
880             # still keep those prints (test description - SKIP), which are done
881             # in stopTest() (for that to trigger, test function must run)
882             for t in testcase_suite:
883                 for m in dir(t):
884                     if m.startswith('test_'):
885                         setattr(t, m, lambda: t.skipTest("not enough cpus"))
886                 setattr(t.__class__, 'setUpClass', lambda: None)
887                 setattr(t.__class__, 'tearDownClass', lambda: None)
888                 setattr(t, 'setUp', lambda: None)
889                 setattr(t, 'tearDown', lambda: None)
890                 t.__class__.skipped_due_to_cpu_lack = True
891         suites.append(testcase_suite)
892
893     print("%s out of %s tests match specified filters" % (
894         tests_amount, tests_amount + cb.filtered.countTestCases()))
895
896     if not config.extended:
897         print("Not running extended tests (some tests will be skipped)")
898
899     attempts = config.retries + 1
900     if attempts > 1:
901         print("Perform %s attempts to pass the suite..." % attempts)
902
903     if run_interactive and suites:
904         # don't fork if requiring interactive terminal
905         print('Running tests in foreground in the current process')
906         full_suite = unittest.TestSuite()
907         free_cpus = list(available_cpus)
908         cpu_shortage = False
909         for suite in suites:
910             if suite.cpus_used <= max_vpp_cpus:
911                 suite.assign_cpus(free_cpus[:suite.cpus_used])
912             else:
913                 suite.assign_cpus([])
914                 cpu_shortage = True
915         full_suite.addTests(suites)
916         result = VppTestRunner(verbosity=config.verbose,
917                                failfast=config.failfast,
918                                print_summary=True).run(full_suite)
919         was_successful = result.wasSuccessful()
920         if not was_successful:
921             for test_case_info in result.failed_test_cases_info:
922                 handle_failed_suite(test_case_info.logger,
923                                     test_case_info.tempdir,
924                                     test_case_info.vpp_pid)
925                 if test_case_info in result.core_crash_test_cases_info:
926                     check_and_handle_core(test_case_info.vpp_bin_path,
927                                           test_case_info.tempdir,
928                                           test_case_info.core_crash_test)
929
930         if cpu_shortage:
931             print()
932             print(colorize('SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT'
933                            ' ENOUGH CPUS AVAILABLE', YELLOW))
934             print()
935         sys.exit(not was_successful)
936     else:
937         print('Running each VPPTestCase in a separate background process'
938               f' with at most {max_concurrent_tests} parallel python test '
939               'process(es)')
940         exit_code = 0
941         while suites and attempts > 0:
942             results = run_forked(suites)
943             exit_code, suites = parse_results(results)
944             attempts -= 1
945             if exit_code == 0:
946                 print('Test run was successful')
947             else:
948                 print('%s attempt(s) left.' % attempts)
949         sys.exit(exit_code)