7209ddfab5de45e2cfdda0d2d381fabae67e568f
[vpp.git] / test / run_tests.py
1 #!/usr/bin/env python3
2
3 import sys
4 import shutil
5 import os
6 import fnmatch
7 import unittest
8 import argparse
9 import time
10 import threading
11 import traceback
12 import signal
13 import re
14 from multiprocessing import Process, Pipe, get_context
15 from multiprocessing.queues import Queue
16 from multiprocessing.managers import BaseManager
17 import framework
18 from config import config, num_cpus, available_cpus, max_vpp_cpus
19 from framework import VppTestRunner, VppTestCase, \
20     get_testcase_doc_name, get_test_description, PASS, FAIL, ERROR, SKIP, \
21     TEST_RUN, SKIP_CPU_SHORTAGE
22 from debug import spawn_gdb, start_vpp_in_gdb
23 from log import get_parallel_logger, double_line_delim, RED, YELLOW, GREEN, \
24     colorize, single_line_delim
25 from discover_tests import discover_tests
26 import sanity_run_vpp
27 from subprocess import check_output, CalledProcessError
28 from util import check_core_path, get_core_path, is_core_present
29
30 # timeout which controls how long the child has to finish after seeing
31 # a core dump in test temporary directory. If this is exceeded, parent assumes
32 # that child process is stuck (e.g. waiting for event from vpp) and kill
33 # the child
34 core_timeout = 3
35
36
37 class StreamQueue(Queue):
38     def write(self, msg):
39         self.put(msg)
40
41     def flush(self):
42         sys.__stdout__.flush()
43         sys.__stderr__.flush()
44
45     def fileno(self):
46         return self._writer.fileno()
47
48
49 class StreamQueueManager(BaseManager):
50     pass
51
52
53 StreamQueueManager.register('StreamQueue', StreamQueue)
54
55
56 class TestResult(dict):
57     def __init__(self, testcase_suite, testcases_by_id=None):
58         super(TestResult, self).__init__()
59         self[PASS] = []
60         self[FAIL] = []
61         self[ERROR] = []
62         self[SKIP] = []
63         self[SKIP_CPU_SHORTAGE] = []
64         self[TEST_RUN] = []
65         self.crashed = False
66         self.testcase_suite = testcase_suite
67         self.testcases = [testcase for testcase in testcase_suite]
68         self.testcases_by_id = testcases_by_id
69
70     def was_successful(self):
71         return 0 == len(self[FAIL]) == len(self[ERROR]) \
72             and len(self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE]) \
73             == self.testcase_suite.countTestCases()
74
75     def no_tests_run(self):
76         return 0 == len(self[TEST_RUN])
77
78     def process_result(self, test_id, result):
79         self[result].append(test_id)
80
81     def suite_from_failed(self):
82         rerun_ids = set([])
83         for testcase in self.testcase_suite:
84             tc_id = testcase.id()
85             if tc_id not in self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE]:
86                 rerun_ids.add(tc_id)
87         if rerun_ids:
88             return suite_from_failed(self.testcase_suite, rerun_ids)
89
90     def get_testcase_names(self, test_id):
91         # could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
92         setup_teardown_match = re.match(
93             r'((tearDownClass)|(setUpClass)) \((.+\..+)\)', test_id)
94         if setup_teardown_match:
95             test_name, _, _, testcase_name = setup_teardown_match.groups()
96             if len(testcase_name.split('.')) == 2:
97                 for key in self.testcases_by_id.keys():
98                     if key.startswith(testcase_name):
99                         testcase_name = key
100                         break
101             testcase_name = self._get_testcase_doc_name(testcase_name)
102         else:
103             test_name = self._get_test_description(test_id)
104             testcase_name = self._get_testcase_doc_name(test_id)
105
106         return testcase_name, test_name
107
108     def _get_test_description(self, test_id):
109         if test_id in self.testcases_by_id:
110             desc = get_test_description(descriptions,
111                                         self.testcases_by_id[test_id])
112         else:
113             desc = test_id
114         return desc
115
116     def _get_testcase_doc_name(self, test_id):
117         if test_id in self.testcases_by_id:
118             doc_name = get_testcase_doc_name(self.testcases_by_id[test_id])
119         else:
120             doc_name = test_id
121         return doc_name
122
123
124 def test_runner_wrapper(suite, keep_alive_pipe, stdouterr_queue,
125                         finished_pipe, result_pipe, logger):
126     sys.stdout = stdouterr_queue
127     sys.stderr = stdouterr_queue
128     VppTestCase.parallel_handler = logger.handlers[0]
129     result = VppTestRunner(keep_alive_pipe=keep_alive_pipe,
130                            descriptions=descriptions,
131                            verbosity=config.verbose,
132                            result_pipe=result_pipe,
133                            failfast=config.failfast,
134                            print_summary=False).run(suite)
135     finished_pipe.send(result.wasSuccessful())
136     finished_pipe.close()
137     keep_alive_pipe.close()
138
139
140 class TestCaseWrapper(object):
141     def __init__(self, testcase_suite, manager):
142         self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(
143             duplex=False)
144         self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
145         self.result_parent_end, self.result_child_end = Pipe(duplex=False)
146         self.testcase_suite = testcase_suite
147         self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
148         self.logger = get_parallel_logger(self.stdouterr_queue)
149         self.child = Process(target=test_runner_wrapper,
150                              args=(testcase_suite,
151                                    self.keep_alive_child_end,
152                                    self.stdouterr_queue,
153                                    self.finished_child_end,
154                                    self.result_child_end,
155                                    self.logger)
156                              )
157         self.child.start()
158         self.last_test_temp_dir = None
159         self.last_test_vpp_binary = None
160         self._last_test = None
161         self.last_test_id = None
162         self.vpp_pid = None
163         self.last_heard = time.time()
164         self.core_detected_at = None
165         self.testcases_by_id = {}
166         self.testclasess_with_core = {}
167         for testcase in self.testcase_suite:
168             self.testcases_by_id[testcase.id()] = testcase
169         self.result = TestResult(testcase_suite, self.testcases_by_id)
170
171     @property
172     def last_test(self):
173         return self._last_test
174
175     @last_test.setter
176     def last_test(self, test_id):
177         self.last_test_id = test_id
178         if test_id in self.testcases_by_id:
179             testcase = self.testcases_by_id[test_id]
180             self._last_test = testcase.shortDescription()
181             if not self._last_test:
182                 self._last_test = str(testcase)
183         else:
184             self._last_test = test_id
185
186     def add_testclass_with_core(self):
187         if self.last_test_id in self.testcases_by_id:
188             test = self.testcases_by_id[self.last_test_id]
189             class_name = unittest.util.strclass(test.__class__)
190             test_name = "'{}' ({})".format(get_test_description(descriptions,
191                                                                 test),
192                                            self.last_test_id)
193         else:
194             test_name = self.last_test_id
195             class_name = re.match(r'((tearDownClass)|(setUpClass)) '
196                                   r'\((.+\..+)\)', test_name).groups()[3]
197         if class_name not in self.testclasess_with_core:
198             self.testclasess_with_core[class_name] = (
199                 test_name,
200                 self.last_test_vpp_binary,
201                 self.last_test_temp_dir)
202
203     def close_pipes(self):
204         self.keep_alive_child_end.close()
205         self.finished_child_end.close()
206         self.result_child_end.close()
207         self.keep_alive_parent_end.close()
208         self.finished_parent_end.close()
209         self.result_parent_end.close()
210
211     def was_successful(self):
212         return self.result.was_successful()
213
214     @property
215     def cpus_used(self):
216         return self.testcase_suite.cpus_used
217
218     def get_assigned_cpus(self):
219         return self.testcase_suite.get_assigned_cpus()
220
221
222 def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases,
223                              read_testcases):
224     read_testcase = None
225     while read_testcases.is_set() or unread_testcases:
226         if finished_unread_testcases:
227             read_testcase = finished_unread_testcases.pop()
228             unread_testcases.remove(read_testcase)
229         elif unread_testcases:
230             read_testcase = unread_testcases.pop()
231         if read_testcase:
232             data = ''
233             while data is not None:
234                 sys.stdout.write(data)
235                 data = read_testcase.stdouterr_queue.get()
236
237             read_testcase.stdouterr_queue.close()
238             finished_unread_testcases.discard(read_testcase)
239             read_testcase = None
240
241
242 def handle_failed_suite(logger, last_test_temp_dir, vpp_pid, vpp_binary):
243     if last_test_temp_dir:
244         # Need to create link in case of a timeout or core dump without failure
245         lttd = os.path.basename(last_test_temp_dir)
246         link_path = '%s%s-FAILED' % (config.failed_dir, lttd)
247         if not os.path.exists(link_path):
248             os.symlink(last_test_temp_dir, link_path)
249         logger.error("Symlink to failed testcase directory: %s -> %s"
250                      % (link_path, lttd))
251
252         # Report core existence
253         core_path = get_core_path(last_test_temp_dir)
254         if os.path.exists(core_path):
255             logger.error(
256                 "Core-file exists in test temporary directory: %s!" %
257                 core_path)
258             check_core_path(logger, core_path)
259             logger.debug("Running 'file %s':" % core_path)
260             try:
261                 info = check_output(["file", core_path])
262                 logger.debug(info)
263             except CalledProcessError as e:
264                 logger.error("Subprocess returned with return code "
265                              "while running `file' utility on core-file "
266                              "returned: "
267                              "rc=%s", e.returncode)
268             except OSError as e:
269                 logger.error("Subprocess returned with OS error while "
270                              "running 'file' utility "
271                              "on core-file: "
272                              "(%s) %s", e.errno, e.strerror)
273             except Exception as e:
274                 logger.exception("Unexpected error running `file' utility "
275                                  "on core-file")
276             logger.error(f"gdb {vpp_binary} {core_path}")
277
278     if vpp_pid:
279         # Copy api post mortem
280         api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
281         if os.path.isfile(api_post_mortem_path):
282             logger.error("Copying api_post_mortem.%d to %s" %
283                          (vpp_pid, last_test_temp_dir))
284             shutil.copy2(api_post_mortem_path, last_test_temp_dir)
285
286
287 def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
288     if is_core_present(tempdir):
289         if debug_core:
290             print('VPP core detected in %s. Last test running was %s' %
291                   (tempdir, core_crash_test))
292             print(single_line_delim)
293             spawn_gdb(vpp_binary, get_core_path(tempdir))
294             print(single_line_delim)
295         elif config.compress_core:
296             print("Compressing core-file in test directory `%s'" % tempdir)
297             os.system("gzip %s" % get_core_path(tempdir))
298
299
300 def handle_cores(failed_testcases):
301     for failed_testcase in failed_testcases:
302         tcs_with_core = failed_testcase.testclasess_with_core
303         if tcs_with_core:
304             for test, vpp_binary, tempdir in tcs_with_core.values():
305                 check_and_handle_core(vpp_binary, tempdir, test)
306
307
308 def process_finished_testsuite(wrapped_testcase_suite,
309                                finished_testcase_suites,
310                                failed_wrapped_testcases,
311                                results):
312     results.append(wrapped_testcase_suite.result)
313     finished_testcase_suites.add(wrapped_testcase_suite)
314     stop_run = False
315     if config.failfast and not wrapped_testcase_suite.was_successful():
316         stop_run = True
317
318     if not wrapped_testcase_suite.was_successful():
319         failed_wrapped_testcases.add(wrapped_testcase_suite)
320         handle_failed_suite(wrapped_testcase_suite.logger,
321                             wrapped_testcase_suite.last_test_temp_dir,
322                             wrapped_testcase_suite.vpp_pid,
323                             wrapped_testcase_suite.last_test_vpp_binary,)
324
325     return stop_run
326
327
328 def run_forked(testcase_suites):
329     wrapped_testcase_suites = set()
330     solo_testcase_suites = []
331
332     # suites are unhashable, need to use list
333     results = []
334     unread_testcases = set()
335     finished_unread_testcases = set()
336     manager = StreamQueueManager()
337     manager.start()
338     tests_running = 0
339     free_cpus = list(available_cpus)
340
341     def on_suite_start(tc):
342         nonlocal tests_running
343         nonlocal free_cpus
344         tests_running = tests_running + 1
345
346     def on_suite_finish(tc):
347         nonlocal tests_running
348         nonlocal free_cpus
349         tests_running = tests_running - 1
350         assert tests_running >= 0
351         free_cpus.extend(tc.get_assigned_cpus())
352
353     def run_suite(suite):
354         nonlocal manager
355         nonlocal wrapped_testcase_suites
356         nonlocal unread_testcases
357         nonlocal free_cpus
358         suite.assign_cpus(free_cpus[:suite.cpus_used])
359         free_cpus = free_cpus[suite.cpus_used:]
360         wrapper = TestCaseWrapper(suite, manager)
361         wrapped_testcase_suites.add(wrapper)
362         unread_testcases.add(wrapper)
363         on_suite_start(suite)
364
365     def can_run_suite(suite):
366         return (tests_running < max_concurrent_tests and
367                 (suite.cpus_used <= len(free_cpus) or
368                  suite.cpus_used > max_vpp_cpus))
369
370     while free_cpus and testcase_suites:
371         a_suite = testcase_suites[0]
372         if a_suite.is_tagged_run_solo:
373             a_suite = testcase_suites.pop(0)
374             solo_testcase_suites.append(a_suite)
375             continue
376         if can_run_suite(a_suite):
377             a_suite = testcase_suites.pop(0)
378             run_suite(a_suite)
379         else:
380             break
381
382     if tests_running == 0 and solo_testcase_suites:
383         a_suite = solo_testcase_suites.pop(0)
384         run_suite(a_suite)
385
386     read_from_testcases = threading.Event()
387     read_from_testcases.set()
388     stdouterr_thread = threading.Thread(target=stdouterr_reader_wrapper,
389                                         args=(unread_testcases,
390                                               finished_unread_testcases,
391                                               read_from_testcases))
392     stdouterr_thread.start()
393
394     failed_wrapped_testcases = set()
395     stop_run = False
396
397     try:
398         while wrapped_testcase_suites:
399             finished_testcase_suites = set()
400             for wrapped_testcase_suite in wrapped_testcase_suites:
401                 while wrapped_testcase_suite.result_parent_end.poll():
402                     wrapped_testcase_suite.result.process_result(
403                         *wrapped_testcase_suite.result_parent_end.recv())
404                     wrapped_testcase_suite.last_heard = time.time()
405
406                 while wrapped_testcase_suite.keep_alive_parent_end.poll():
407                     wrapped_testcase_suite.last_test, \
408                         wrapped_testcase_suite.last_test_vpp_binary, \
409                         wrapped_testcase_suite.last_test_temp_dir, \
410                         wrapped_testcase_suite.vpp_pid = \
411                         wrapped_testcase_suite.keep_alive_parent_end.recv()
412                     wrapped_testcase_suite.last_heard = time.time()
413
414                 if wrapped_testcase_suite.finished_parent_end.poll():
415                     wrapped_testcase_suite.finished_parent_end.recv()
416                     wrapped_testcase_suite.last_heard = time.time()
417                     stop_run = process_finished_testsuite(
418                         wrapped_testcase_suite,
419                         finished_testcase_suites,
420                         failed_wrapped_testcases,
421                         results) or stop_run
422                     continue
423
424                 fail = False
425                 if wrapped_testcase_suite.last_heard + config.timeout < \
426                         time.time():
427                     fail = True
428                     wrapped_testcase_suite.logger.critical(
429                         "Child test runner process timed out "
430                         "(last test running was `%s' in `%s')!" %
431                         (wrapped_testcase_suite.last_test,
432                          wrapped_testcase_suite.last_test_temp_dir))
433                 elif not wrapped_testcase_suite.child.is_alive():
434                     fail = True
435                     wrapped_testcase_suite.logger.critical(
436                         "Child test runner process unexpectedly died "
437                         "(last test running was `%s' in `%s')!" %
438                         (wrapped_testcase_suite.last_test,
439                          wrapped_testcase_suite.last_test_temp_dir))
440                 elif wrapped_testcase_suite.last_test_temp_dir and \
441                         wrapped_testcase_suite.last_test_vpp_binary:
442                     if is_core_present(
443                             wrapped_testcase_suite.last_test_temp_dir):
444                         wrapped_testcase_suite.add_testclass_with_core()
445                         if wrapped_testcase_suite.core_detected_at is None:
446                             wrapped_testcase_suite.core_detected_at = \
447                                 time.time()
448                         elif wrapped_testcase_suite.core_detected_at + \
449                                 core_timeout < time.time():
450                             wrapped_testcase_suite.logger.critical(
451                                 "Child test runner process unresponsive and "
452                                 "core-file exists in test temporary directory "
453                                 "(last test running was `%s' in `%s')!" %
454                                 (wrapped_testcase_suite.last_test,
455                                  wrapped_testcase_suite.last_test_temp_dir))
456                             fail = True
457
458                 if fail:
459                     wrapped_testcase_suite.child.terminate()
460                     try:
461                         # terminating the child process tends to leave orphan
462                         # VPP process around
463                         if wrapped_testcase_suite.vpp_pid:
464                             os.kill(wrapped_testcase_suite.vpp_pid,
465                                     signal.SIGTERM)
466                     except OSError:
467                         # already dead
468                         pass
469                     wrapped_testcase_suite.result.crashed = True
470                     wrapped_testcase_suite.result.process_result(
471                         wrapped_testcase_suite.last_test_id, ERROR)
472                     stop_run = process_finished_testsuite(
473                         wrapped_testcase_suite,
474                         finished_testcase_suites,
475                         failed_wrapped_testcases,
476                         results) or stop_run
477
478             for finished_testcase in finished_testcase_suites:
479                 # Somewhat surprisingly, the join below may
480                 # timeout, even if client signaled that
481                 # it finished - so we note it just in case.
482                 join_start = time.time()
483                 finished_testcase.child.join(test_finished_join_timeout)
484                 join_end = time.time()
485                 if join_end - join_start >= test_finished_join_timeout:
486                     finished_testcase.logger.error(
487                         "Timeout joining finished test: %s (pid %d)" %
488                         (finished_testcase.last_test,
489                          finished_testcase.child.pid))
490                 finished_testcase.close_pipes()
491                 wrapped_testcase_suites.remove(finished_testcase)
492                 finished_unread_testcases.add(finished_testcase)
493                 finished_testcase.stdouterr_queue.put(None)
494                 on_suite_finish(finished_testcase)
495                 if stop_run:
496                     while testcase_suites:
497                         results.append(TestResult(testcase_suites.pop(0)))
498                 elif testcase_suites:
499                     a_suite = testcase_suites.pop(0)
500                     while a_suite and a_suite.is_tagged_run_solo:
501                         solo_testcase_suites.append(a_suite)
502                         if testcase_suites:
503                             a_suite = testcase_suites.pop(0)
504                         else:
505                             a_suite = None
506                     if a_suite and can_run_suite(a_suite):
507                         run_suite(a_suite)
508                 if solo_testcase_suites and tests_running == 0:
509                     a_suite = solo_testcase_suites.pop(0)
510                     run_suite(a_suite)
511             time.sleep(0.1)
512     except Exception:
513         for wrapped_testcase_suite in wrapped_testcase_suites:
514             wrapped_testcase_suite.child.terminate()
515             wrapped_testcase_suite.stdouterr_queue.put(None)
516         raise
517     finally:
518         read_from_testcases.clear()
519         stdouterr_thread.join(config.timeout)
520         manager.shutdown()
521
522     handle_cores(failed_wrapped_testcases)
523     return results
524
525
526 class TestSuiteWrapper(unittest.TestSuite):
527     cpus_used = 0
528
529     def __init__(self):
530         return super().__init__()
531
532     def addTest(self, test):
533         self.cpus_used = max(self.cpus_used, test.get_cpus_required())
534         super().addTest(test)
535
536     def assign_cpus(self, cpus):
537         self.cpus = cpus
538
539     def _handleClassSetUp(self, test, result):
540         if not test.__class__.skipped_due_to_cpu_lack:
541             test.assign_cpus(self.cpus)
542         super()._handleClassSetUp(test, result)
543
544     def get_assigned_cpus(self):
545         return self.cpus
546
547
548 class SplitToSuitesCallback:
549     def __init__(self, filter_callback):
550         self.suites = {}
551         self.suite_name = 'default'
552         self.filter_callback = filter_callback
553         self.filtered = TestSuiteWrapper()
554
555     def __call__(self, file_name, cls, method):
556         test_method = cls(method)
557         if self.filter_callback(file_name, cls.__name__, method):
558             self.suite_name = file_name + cls.__name__
559             if self.suite_name not in self.suites:
560                 self.suites[self.suite_name] = TestSuiteWrapper()
561                 self.suites[self.suite_name].is_tagged_run_solo = False
562             self.suites[self.suite_name].addTest(test_method)
563             if test_method.is_tagged_run_solo():
564                 self.suites[self.suite_name].is_tagged_run_solo = True
565
566         else:
567             self.filtered.addTest(test_method)
568
569
570 def parse_test_filter(test_filter):
571     f = test_filter
572     filter_file_name = None
573     filter_class_name = None
574     filter_func_name = None
575     if f:
576         if '.' in f:
577             parts = f.split('.')
578             if len(parts) > 3:
579                 raise Exception("Unrecognized %s option: %s" %
580                                 (test_option, f))
581             if len(parts) > 2:
582                 if parts[2] not in ('*', ''):
583                     filter_func_name = parts[2]
584             if parts[1] not in ('*', ''):
585                 filter_class_name = parts[1]
586             if parts[0] not in ('*', ''):
587                 if parts[0].startswith('test_'):
588                     filter_file_name = parts[0]
589                 else:
590                     filter_file_name = 'test_%s' % parts[0]
591         else:
592             if f.startswith('test_'):
593                 filter_file_name = f
594             else:
595                 filter_file_name = 'test_%s' % f
596     if filter_file_name:
597         filter_file_name = '%s.py' % filter_file_name
598     return filter_file_name, filter_class_name, filter_func_name
599
600
601 def filter_tests(tests, filter_cb):
602     result = TestSuiteWrapper()
603     for t in tests:
604         if isinstance(t, unittest.suite.TestSuite):
605             # this is a bunch of tests, recursively filter...
606             x = filter_tests(t, filter_cb)
607             if x.countTestCases() > 0:
608                 result.addTest(x)
609         elif isinstance(t, unittest.TestCase):
610             # this is a single test
611             parts = t.id().split('.')
612             # t.id() for common cases like this:
613             # test_classifier.TestClassifier.test_acl_ip
614             # apply filtering only if it is so
615             if len(parts) == 3:
616                 if not filter_cb(parts[0], parts[1], parts[2]):
617                     continue
618             result.addTest(t)
619         else:
620             # unexpected object, don't touch it
621             result.addTest(t)
622     return result
623
624
625 class FilterByTestOption:
626     def __init__(self, filter_file_name, filter_class_name, filter_func_name):
627         self.filter_file_name = filter_file_name
628         self.filter_class_name = filter_class_name
629         self.filter_func_name = filter_func_name
630
631     def __call__(self, file_name, class_name, func_name):
632         if self.filter_file_name:
633             fn_match = fnmatch.fnmatch(file_name, self.filter_file_name)
634             if not fn_match:
635                 return False
636         if self.filter_class_name and class_name != self.filter_class_name:
637             return False
638         if self.filter_func_name and func_name != self.filter_func_name:
639             return False
640         return True
641
642
643 class FilterByClassList:
644     def __init__(self, classes_with_filenames):
645         self.classes_with_filenames = classes_with_filenames
646
647     def __call__(self, file_name, class_name, func_name):
648         return '.'.join([file_name, class_name]) in self.classes_with_filenames
649
650
651 def suite_from_failed(suite, failed):
652     failed = {x.rsplit('.', 1)[0] for x in failed}
653     filter_cb = FilterByClassList(failed)
654     suite = filter_tests(suite, filter_cb)
655     return suite
656
657
658 class AllResults(dict):
659     def __init__(self):
660         super(AllResults, self).__init__()
661         self.all_testcases = 0
662         self.results_per_suite = []
663         self[PASS] = 0
664         self[FAIL] = 0
665         self[ERROR] = 0
666         self[SKIP] = 0
667         self[SKIP_CPU_SHORTAGE] = 0
668         self[TEST_RUN] = 0
669         self.rerun = []
670         self.testsuites_no_tests_run = []
671
672     def add_results(self, result):
673         self.results_per_suite.append(result)
674         result_types = [PASS, FAIL, ERROR, SKIP, TEST_RUN, SKIP_CPU_SHORTAGE]
675         for result_type in result_types:
676             self[result_type] += len(result[result_type])
677
678     def add_result(self, result):
679         retval = 0
680         self.all_testcases += result.testcase_suite.countTestCases()
681         self.add_results(result)
682
683         if result.no_tests_run():
684             self.testsuites_no_tests_run.append(result.testcase_suite)
685             if result.crashed:
686                 retval = -1
687             else:
688                 retval = 1
689         elif not result.was_successful():
690             retval = 1
691
692         if retval != 0:
693             self.rerun.append(result.testcase_suite)
694
695         return retval
696
697     def print_results(self):
698         print('')
699         print(double_line_delim)
700         print('TEST RESULTS:')
701
702         def indent_results(lines):
703             lines = list(filter(None, lines))
704             maximum = max(lines, key=lambda x: x.index(":"))
705             maximum = 4 + maximum.index(":")
706             for l in lines:
707                 padding = " " * (maximum - l.index(":"))
708                 print(f"{padding}{l}")
709
710         indent_results([
711             f'Scheduled tests: {self.all_testcases}',
712             f'Executed tests: {self[TEST_RUN]}',
713             f'Passed tests: {colorize(self[PASS], GREEN)}',
714             f'Skipped tests: {colorize(self[SKIP], YELLOW)}'
715             if self[SKIP] else None,
716             f'Not Executed tests: {colorize(self.not_executed, RED)}'
717             if self.not_executed else None,
718             f'Failures: {colorize(self[FAIL], RED)}' if self[FAIL] else None,
719             f'Errors: {colorize(self[ERROR], RED)}' if self[ERROR] else None,
720             'Tests skipped due to lack of CPUS: '
721             f'{colorize(self[SKIP_CPU_SHORTAGE], YELLOW)}'
722             if self[SKIP_CPU_SHORTAGE] else None
723         ])
724
725         if self.all_failed > 0:
726             print('FAILURES AND ERRORS IN TESTS:')
727             for result in self.results_per_suite:
728                 failed_testcase_ids = result[FAIL]
729                 errored_testcase_ids = result[ERROR]
730                 old_testcase_name = None
731                 if failed_testcase_ids:
732                     for failed_test_id in failed_testcase_ids:
733                         new_testcase_name, test_name = \
734                             result.get_testcase_names(failed_test_id)
735                         if new_testcase_name != old_testcase_name:
736                             print('  Testcase name: {}'.format(
737                                 colorize(new_testcase_name, RED)))
738                             old_testcase_name = new_testcase_name
739                         print('    FAILURE: {} [{}]'.format(
740                             colorize(test_name, RED), failed_test_id))
741                 if errored_testcase_ids:
742                     for errored_test_id in errored_testcase_ids:
743                         new_testcase_name, test_name = \
744                             result.get_testcase_names(errored_test_id)
745                         if new_testcase_name != old_testcase_name:
746                             print('  Testcase name: {}'.format(
747                                 colorize(new_testcase_name, RED)))
748                             old_testcase_name = new_testcase_name
749                         print('      ERROR: {} [{}]'.format(
750                             colorize(test_name, RED), errored_test_id))
751         if self.testsuites_no_tests_run:
752             print('TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:')
753             tc_classes = set()
754             for testsuite in self.testsuites_no_tests_run:
755                 for testcase in testsuite:
756                     tc_classes.add(get_testcase_doc_name(testcase))
757             for tc_class in tc_classes:
758                 print('  {}'.format(colorize(tc_class, RED)))
759
760         if self[SKIP_CPU_SHORTAGE]:
761             print()
762             print(colorize('     SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT'
763                            ' ENOUGH CPUS AVAILABLE', YELLOW))
764         print(double_line_delim)
765         print('')
766
767     @property
768     def not_executed(self):
769         return self.all_testcases - self[TEST_RUN]
770
771     @property
772     def all_failed(self):
773         return self[FAIL] + self[ERROR]
774
775
776 def parse_results(results):
777     """
778     Prints the number of scheduled, executed, not executed, passed, failed,
779     errored and skipped tests and details about failed and errored tests.
780
781     Also returns all suites where any test failed.
782
783     :param results:
784     :return:
785     """
786
787     results_per_suite = AllResults()
788     crashed = False
789     failed = False
790     for result in results:
791         result_code = results_per_suite.add_result(result)
792         if result_code == 1:
793             failed = True
794         elif result_code == -1:
795             crashed = True
796
797     results_per_suite.print_results()
798
799     if crashed:
800         return_code = -1
801     elif failed:
802         return_code = 1
803     else:
804         return_code = 0
805     return return_code, results_per_suite.rerun
806
807
808 if __name__ == '__main__':
809
810     print(f"Config is: {config}")
811
812     if config.sanity:
813         print("Running sanity test case.")
814         try:
815             rc = sanity_run_vpp.main()
816             if rc != 0:
817                 sys.exit(rc)
818         except Exception as e:
819             print(traceback.format_exc())
820             print("Couldn't run sanity test case.")
821             sys.exit(-1)
822
823     test_finished_join_timeout = 15
824
825     debug_gdb = config.debug in ["gdb", "gdbserver", "attach"]
826     debug_core = config.debug == "core"
827
828     run_interactive = debug_gdb or config.step or config.force_foreground
829
830     max_concurrent_tests = 0
831     print(f"OS reports {num_cpus} available cpu(s).")
832
833     test_jobs = config.jobs
834     if test_jobs == 'auto':
835         if run_interactive:
836             max_concurrent_tests = 1
837             print('Interactive mode required, running tests consecutively.')
838         else:
839             max_concurrent_tests = num_cpus
840             print(f"Running at most {max_concurrent_tests} python test "
841                   "processes concurrently.")
842     else:
843         max_concurrent_tests = test_jobs
844         print(f"Running at most {max_concurrent_tests} python test processes "
845               "concurrently as set by 'TEST_JOBS'.")
846
847     print(f"Using at most {max_vpp_cpus} cpus for VPP threads.")
848
849     if run_interactive and max_concurrent_tests > 1:
850         raise NotImplementedError(
851             'Running tests interactively (DEBUG is gdb[server] or ATTACH or '
852             'STEP is set) in parallel (TEST_JOBS is more than 1) is not '
853             'supported')
854
855     descriptions = True
856
857     print("Running tests using custom test runner.")
858     filter_file, filter_class, filter_func = \
859         parse_test_filter(config.filter)
860
861     print("Selected filters: file=%s, class=%s, function=%s" % (
862         filter_file, filter_class, filter_func))
863
864     filter_cb = FilterByTestOption(filter_file, filter_class, filter_func)
865
866     ignore_path = config.venv_dir
867     cb = SplitToSuitesCallback(filter_cb)
868     for d in config.test_src_dir:
869         print("Adding tests from directory tree %s" % d)
870         discover_tests(d, cb, ignore_path)
871
872     # suites are not hashable, need to use list
873     suites = []
874     tests_amount = 0
875     for testcase_suite in cb.suites.values():
876         tests_amount += testcase_suite.countTestCases()
877         if testcase_suite.cpus_used > max_vpp_cpus:
878             # here we replace test functions with lambdas to just skip them
879             # but we also replace setUp/tearDown functions to do nothing
880             # so that the test can be "started" and "stopped", so that we can
881             # still keep those prints (test description - SKIP), which are done
882             # in stopTest() (for that to trigger, test function must run)
883             for t in testcase_suite:
884                 for m in dir(t):
885                     if m.startswith('test_'):
886                         setattr(t, m, lambda: t.skipTest("not enough cpus"))
887                 setattr(t.__class__, 'setUpClass', lambda: None)
888                 setattr(t.__class__, 'tearDownClass', lambda: None)
889                 setattr(t, 'setUp', lambda: None)
890                 setattr(t, 'tearDown', lambda: None)
891                 t.__class__.skipped_due_to_cpu_lack = True
892         suites.append(testcase_suite)
893
894     print("%s out of %s tests match specified filters" % (
895         tests_amount, tests_amount + cb.filtered.countTestCases()))
896
897     if not config.extended:
898         print("Not running extended tests (some tests will be skipped)")
899
900     attempts = config.retries + 1
901     if attempts > 1:
902         print("Perform %s attempts to pass the suite..." % attempts)
903
904     if run_interactive and suites:
905         # don't fork if requiring interactive terminal
906         print('Running tests in foreground in the current process')
907         full_suite = unittest.TestSuite()
908         free_cpus = list(available_cpus)
909         cpu_shortage = False
910         for suite in suites:
911             if suite.cpus_used <= max_vpp_cpus:
912                 suite.assign_cpus(free_cpus[:suite.cpus_used])
913             else:
914                 suite.assign_cpus([])
915                 cpu_shortage = True
916         full_suite.addTests(suites)
917         result = VppTestRunner(verbosity=config.verbose,
918                                failfast=config.failfast,
919                                print_summary=True).run(full_suite)
920         was_successful = result.wasSuccessful()
921         if not was_successful:
922             for test_case_info in result.failed_test_cases_info:
923                 handle_failed_suite(test_case_info.logger,
924                                     test_case_info.tempdir,
925                                     test_case_info.vpp_pid)
926                 if test_case_info in result.core_crash_test_cases_info:
927                     check_and_handle_core(test_case_info.vpp_bin_path,
928                                           test_case_info.tempdir,
929                                           test_case_info.core_crash_test)
930
931         if cpu_shortage:
932             print()
933             print(colorize('SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT'
934                            ' ENOUGH CPUS AVAILABLE', YELLOW))
935             print()
936         sys.exit(not was_successful)
937     else:
938         print('Running each VPPTestCase in a separate background process'
939               f' with at most {max_concurrent_tests} parallel python test '
940               'process(es)')
941         exit_code = 0
942         while suites and attempts > 0:
943             results = run_forked(suites)
944             exit_code, suites = parse_results(results)
945             attempts -= 1
946             if exit_code == 0:
947                 print('Test run was successful')
948             else:
949                 print('%s attempt(s) left.' % attempts)
950         sys.exit(exit_code)