5df37efba6bbca85ea1001901eca81bb8b5e365f
[vpp.git] / test / run_tests.py
1 #!/usr/bin/env python3
2
3 import sys
4 import shutil
5 import os
6 import fnmatch
7 import unittest
8 import argparse
9 import time
10 import threading
11 import traceback
12 import signal
13 import re
14 from multiprocessing import Process, Pipe, get_context
15 from multiprocessing.queues import Queue
16 from multiprocessing.managers import BaseManager
17 import framework
18 from config import config, num_cpus, available_cpus, max_vpp_cpus
19 from framework import (
20     VppTestRunner,
21     VppTestCase,
22     get_testcase_doc_name,
23     get_test_description,
24     PASS,
25     FAIL,
26     ERROR,
27     SKIP,
28     TEST_RUN,
29     SKIP_CPU_SHORTAGE,
30 )
31 from debug import spawn_gdb, start_vpp_in_gdb
32 from log import (
33     get_parallel_logger,
34     double_line_delim,
35     RED,
36     YELLOW,
37     GREEN,
38     colorize,
39     single_line_delim,
40 )
41 from discover_tests import discover_tests
42 import sanity_run_vpp
43 from subprocess import check_output, CalledProcessError
44 from util import check_core_path, get_core_path, is_core_present
45
46 # timeout which controls how long the child has to finish after seeing
47 # a core dump in test temporary directory. If this is exceeded, parent assumes
48 # that child process is stuck (e.g. waiting for event from vpp) and kill
49 # the child
50 core_timeout = 3
51
52
53 class StreamQueue(Queue):
54     def write(self, msg):
55         self.put(msg)
56
57     def flush(self):
58         sys.__stdout__.flush()
59         sys.__stderr__.flush()
60
61     def fileno(self):
62         return self._writer.fileno()
63
64
65 class StreamQueueManager(BaseManager):
66     pass
67
68
69 StreamQueueManager.register("StreamQueue", StreamQueue)
70
71
72 class TestResult(dict):
73     def __init__(self, testcase_suite, testcases_by_id=None):
74         super(TestResult, self).__init__()
75         self[PASS] = []
76         self[FAIL] = []
77         self[ERROR] = []
78         self[SKIP] = []
79         self[SKIP_CPU_SHORTAGE] = []
80         self[TEST_RUN] = []
81         self.crashed = False
82         self.testcase_suite = testcase_suite
83         self.testcases = [testcase for testcase in testcase_suite]
84         self.testcases_by_id = testcases_by_id
85
86     def was_successful(self):
87         return (
88             0 == len(self[FAIL]) == len(self[ERROR])
89             and len(self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE])
90             == self.testcase_suite.countTestCases()
91         )
92
93     def no_tests_run(self):
94         return 0 == len(self[TEST_RUN])
95
96     def process_result(self, test_id, result):
97         self[result].append(test_id)
98
99     def suite_from_failed(self):
100         rerun_ids = set([])
101         for testcase in self.testcase_suite:
102             tc_id = testcase.id()
103             if tc_id not in self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE]:
104                 rerun_ids.add(tc_id)
105         if rerun_ids:
106             return suite_from_failed(self.testcase_suite, rerun_ids)
107
108     def get_testcase_names(self, test_id):
109         # could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
110         setup_teardown_match = re.match(
111             r"((tearDownClass)|(setUpClass)) \((.+\..+)\)", test_id
112         )
113         if setup_teardown_match:
114             test_name, _, _, testcase_name = setup_teardown_match.groups()
115             if len(testcase_name.split(".")) == 2:
116                 for key in self.testcases_by_id.keys():
117                     if key.startswith(testcase_name):
118                         testcase_name = key
119                         break
120             testcase_name = self._get_testcase_doc_name(testcase_name)
121         else:
122             test_name = self._get_test_description(test_id)
123             testcase_name = self._get_testcase_doc_name(test_id)
124
125         return testcase_name, test_name
126
127     def _get_test_description(self, test_id):
128         if test_id in self.testcases_by_id:
129             desc = get_test_description(descriptions, self.testcases_by_id[test_id])
130         else:
131             desc = test_id
132         return desc
133
134     def _get_testcase_doc_name(self, test_id):
135         if test_id in self.testcases_by_id:
136             doc_name = get_testcase_doc_name(self.testcases_by_id[test_id])
137         else:
138             doc_name = test_id
139         return doc_name
140
141
142 def test_runner_wrapper(
143     suite, keep_alive_pipe, stdouterr_queue, finished_pipe, result_pipe, logger
144 ):
145     sys.stdout = stdouterr_queue
146     sys.stderr = stdouterr_queue
147     VppTestCase.parallel_handler = logger.handlers[0]
148     result = VppTestRunner(
149         keep_alive_pipe=keep_alive_pipe,
150         descriptions=descriptions,
151         verbosity=config.verbose,
152         result_pipe=result_pipe,
153         failfast=config.failfast,
154         print_summary=False,
155     ).run(suite)
156     finished_pipe.send(result.wasSuccessful())
157     finished_pipe.close()
158     keep_alive_pipe.close()
159
160
161 class TestCaseWrapper(object):
162     def __init__(self, testcase_suite, manager):
163         self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(duplex=False)
164         self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
165         self.result_parent_end, self.result_child_end = Pipe(duplex=False)
166         self.testcase_suite = testcase_suite
167         self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
168         self.logger = get_parallel_logger(self.stdouterr_queue)
169         self.child = Process(
170             target=test_runner_wrapper,
171             args=(
172                 testcase_suite,
173                 self.keep_alive_child_end,
174                 self.stdouterr_queue,
175                 self.finished_child_end,
176                 self.result_child_end,
177                 self.logger,
178             ),
179         )
180         self.child.start()
181         self.last_test_temp_dir = None
182         self.last_test_vpp_binary = None
183         self._last_test = None
184         self.last_test_id = None
185         self.vpp_pid = None
186         self.last_heard = time.time()
187         self.core_detected_at = None
188         self.testcases_by_id = {}
189         self.testclasess_with_core = {}
190         for testcase in self.testcase_suite:
191             self.testcases_by_id[testcase.id()] = testcase
192         self.result = TestResult(testcase_suite, self.testcases_by_id)
193
194     @property
195     def last_test(self):
196         return self._last_test
197
198     @last_test.setter
199     def last_test(self, test_id):
200         self.last_test_id = test_id
201         if test_id in self.testcases_by_id:
202             testcase = self.testcases_by_id[test_id]
203             self._last_test = testcase.shortDescription()
204             if not self._last_test:
205                 self._last_test = str(testcase)
206         else:
207             self._last_test = test_id
208
209     def add_testclass_with_core(self):
210         if self.last_test_id in self.testcases_by_id:
211             test = self.testcases_by_id[self.last_test_id]
212             class_name = unittest.util.strclass(test.__class__)
213             test_name = "'{}' ({})".format(
214                 get_test_description(descriptions, test), self.last_test_id
215             )
216         else:
217             test_name = self.last_test_id
218             class_name = re.match(
219                 r"((tearDownClass)|(setUpClass)) " r"\((.+\..+)\)", test_name
220             ).groups()[3]
221         if class_name not in self.testclasess_with_core:
222             self.testclasess_with_core[class_name] = (
223                 test_name,
224                 self.last_test_vpp_binary,
225                 self.last_test_temp_dir,
226             )
227
228     def close_pipes(self):
229         self.keep_alive_child_end.close()
230         self.finished_child_end.close()
231         self.result_child_end.close()
232         self.keep_alive_parent_end.close()
233         self.finished_parent_end.close()
234         self.result_parent_end.close()
235
236     def was_successful(self):
237         return self.result.was_successful()
238
239     @property
240     def cpus_used(self):
241         return self.testcase_suite.cpus_used
242
243     def get_assigned_cpus(self):
244         return self.testcase_suite.get_assigned_cpus()
245
246
247 def stdouterr_reader_wrapper(
248     unread_testcases, finished_unread_testcases, read_testcases
249 ):
250     read_testcase = None
251     while read_testcases.is_set() or unread_testcases:
252         if finished_unread_testcases:
253             read_testcase = finished_unread_testcases.pop()
254             unread_testcases.remove(read_testcase)
255         elif unread_testcases:
256             read_testcase = unread_testcases.pop()
257         if read_testcase:
258             data = ""
259             while data is not None:
260                 sys.stdout.write(data)
261                 data = read_testcase.stdouterr_queue.get()
262
263             read_testcase.stdouterr_queue.close()
264             finished_unread_testcases.discard(read_testcase)
265             read_testcase = None
266
267
268 def handle_failed_suite(logger, last_test_temp_dir, vpp_pid, vpp_binary):
269     if last_test_temp_dir:
270         # Need to create link in case of a timeout or core dump without failure
271         lttd = os.path.basename(last_test_temp_dir)
272         link_path = "%s%s-FAILED" % (config.failed_dir, lttd)
273         if not os.path.exists(link_path):
274             os.symlink(last_test_temp_dir, link_path)
275         logger.error(
276             "Symlink to failed testcase directory: %s -> %s" % (link_path, lttd)
277         )
278
279         # Report core existence
280         core_path = get_core_path(last_test_temp_dir)
281         if os.path.exists(core_path):
282             logger.error(
283                 "Core-file exists in test temporary directory: %s!" % core_path
284             )
285             check_core_path(logger, core_path)
286             logger.debug("Running 'file %s':" % core_path)
287             try:
288                 info = check_output(["file", core_path])
289                 logger.debug(info)
290             except CalledProcessError as e:
291                 logger.error(
292                     "Subprocess returned with return code "
293                     "while running `file' utility on core-file "
294                     "returned: "
295                     "rc=%s",
296                     e.returncode,
297                 )
298             except OSError as e:
299                 logger.error(
300                     "Subprocess returned with OS error while "
301                     "running 'file' utility "
302                     "on core-file: "
303                     "(%s) %s",
304                     e.errno,
305                     e.strerror,
306                 )
307             except Exception as e:
308                 logger.exception("Unexpected error running `file' utility on core-file")
309             logger.error(f"gdb {vpp_binary} {core_path}")
310
311     if vpp_pid:
312         # Copy api post mortem
313         api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
314         if os.path.isfile(api_post_mortem_path):
315             logger.error(
316                 "Copying api_post_mortem.%d to %s" % (vpp_pid, last_test_temp_dir)
317             )
318             shutil.copy2(api_post_mortem_path, last_test_temp_dir)
319
320
321 def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
322     if is_core_present(tempdir):
323         if debug_core:
324             print(
325                 "VPP core detected in %s. Last test running was %s"
326                 % (tempdir, core_crash_test)
327             )
328             print(single_line_delim)
329             spawn_gdb(vpp_binary, get_core_path(tempdir))
330             print(single_line_delim)
331         elif config.compress_core:
332             print("Compressing core-file in test directory `%s'" % tempdir)
333             os.system("gzip %s" % get_core_path(tempdir))
334
335
336 def handle_cores(failed_testcases):
337     for failed_testcase in failed_testcases:
338         tcs_with_core = failed_testcase.testclasess_with_core
339         if tcs_with_core:
340             for test, vpp_binary, tempdir in tcs_with_core.values():
341                 check_and_handle_core(vpp_binary, tempdir, test)
342
343
344 def process_finished_testsuite(
345     wrapped_testcase_suite, finished_testcase_suites, failed_wrapped_testcases, results
346 ):
347     results.append(wrapped_testcase_suite.result)
348     finished_testcase_suites.add(wrapped_testcase_suite)
349     stop_run = False
350     if config.failfast and not wrapped_testcase_suite.was_successful():
351         stop_run = True
352
353     if not wrapped_testcase_suite.was_successful():
354         failed_wrapped_testcases.add(wrapped_testcase_suite)
355         handle_failed_suite(
356             wrapped_testcase_suite.logger,
357             wrapped_testcase_suite.last_test_temp_dir,
358             wrapped_testcase_suite.vpp_pid,
359             wrapped_testcase_suite.last_test_vpp_binary,
360         )
361
362     return stop_run
363
364
365 def run_forked(testcase_suites):
366     wrapped_testcase_suites = set()
367     solo_testcase_suites = []
368
369     # suites are unhashable, need to use list
370     results = []
371     unread_testcases = set()
372     finished_unread_testcases = set()
373     manager = StreamQueueManager()
374     manager.start()
375     tests_running = 0
376     free_cpus = list(available_cpus)
377
378     def on_suite_start(tc):
379         nonlocal tests_running
380         nonlocal free_cpus
381         tests_running = tests_running + 1
382
383     def on_suite_finish(tc):
384         nonlocal tests_running
385         nonlocal free_cpus
386         tests_running = tests_running - 1
387         assert tests_running >= 0
388         free_cpus.extend(tc.get_assigned_cpus())
389
390     def run_suite(suite):
391         nonlocal manager
392         nonlocal wrapped_testcase_suites
393         nonlocal unread_testcases
394         nonlocal free_cpus
395         suite.assign_cpus(free_cpus[: suite.cpus_used])
396         free_cpus = free_cpus[suite.cpus_used :]
397         wrapper = TestCaseWrapper(suite, manager)
398         wrapped_testcase_suites.add(wrapper)
399         unread_testcases.add(wrapper)
400         on_suite_start(suite)
401
402     def can_run_suite(suite):
403         return tests_running < max_concurrent_tests and (
404             suite.cpus_used <= len(free_cpus) or suite.cpus_used > max_vpp_cpus
405         )
406
407     while free_cpus and testcase_suites:
408         a_suite = testcase_suites[0]
409         if a_suite.is_tagged_run_solo:
410             a_suite = testcase_suites.pop(0)
411             solo_testcase_suites.append(a_suite)
412             continue
413         if can_run_suite(a_suite):
414             a_suite = testcase_suites.pop(0)
415             run_suite(a_suite)
416         else:
417             break
418
419     if tests_running == 0 and solo_testcase_suites:
420         a_suite = solo_testcase_suites.pop(0)
421         run_suite(a_suite)
422
423     read_from_testcases = threading.Event()
424     read_from_testcases.set()
425     stdouterr_thread = threading.Thread(
426         target=stdouterr_reader_wrapper,
427         args=(unread_testcases, finished_unread_testcases, read_from_testcases),
428     )
429     stdouterr_thread.start()
430
431     failed_wrapped_testcases = set()
432     stop_run = False
433
434     try:
435         while wrapped_testcase_suites:
436             finished_testcase_suites = set()
437             for wrapped_testcase_suite in wrapped_testcase_suites:
438                 while wrapped_testcase_suite.result_parent_end.poll():
439                     wrapped_testcase_suite.result.process_result(
440                         *wrapped_testcase_suite.result_parent_end.recv()
441                     )
442                     wrapped_testcase_suite.last_heard = time.time()
443
444                 while wrapped_testcase_suite.keep_alive_parent_end.poll():
445                     (
446                         wrapped_testcase_suite.last_test,
447                         wrapped_testcase_suite.last_test_vpp_binary,
448                         wrapped_testcase_suite.last_test_temp_dir,
449                         wrapped_testcase_suite.vpp_pid,
450                     ) = wrapped_testcase_suite.keep_alive_parent_end.recv()
451                     wrapped_testcase_suite.last_heard = time.time()
452
453                 if wrapped_testcase_suite.finished_parent_end.poll():
454                     wrapped_testcase_suite.finished_parent_end.recv()
455                     wrapped_testcase_suite.last_heard = time.time()
456                     stop_run = (
457                         process_finished_testsuite(
458                             wrapped_testcase_suite,
459                             finished_testcase_suites,
460                             failed_wrapped_testcases,
461                             results,
462                         )
463                         or stop_run
464                     )
465                     continue
466
467                 fail = False
468                 if wrapped_testcase_suite.last_heard + config.timeout < time.time():
469                     fail = True
470                     wrapped_testcase_suite.logger.critical(
471                         "Child test runner process timed out "
472                         "(last test running was `%s' in `%s')!"
473                         % (
474                             wrapped_testcase_suite.last_test,
475                             wrapped_testcase_suite.last_test_temp_dir,
476                         )
477                     )
478                 elif not wrapped_testcase_suite.child.is_alive():
479                     fail = True
480                     wrapped_testcase_suite.logger.critical(
481                         "Child test runner process unexpectedly died "
482                         "(last test running was `%s' in `%s')!"
483                         % (
484                             wrapped_testcase_suite.last_test,
485                             wrapped_testcase_suite.last_test_temp_dir,
486                         )
487                     )
488                 elif (
489                     wrapped_testcase_suite.last_test_temp_dir
490                     and wrapped_testcase_suite.last_test_vpp_binary
491                 ):
492                     if is_core_present(wrapped_testcase_suite.last_test_temp_dir):
493                         wrapped_testcase_suite.add_testclass_with_core()
494                         if wrapped_testcase_suite.core_detected_at is None:
495                             wrapped_testcase_suite.core_detected_at = time.time()
496                         elif (
497                             wrapped_testcase_suite.core_detected_at + core_timeout
498                             < time.time()
499                         ):
500                             wrapped_testcase_suite.logger.critical(
501                                 "Child test runner process unresponsive and "
502                                 "core-file exists in test temporary directory "
503                                 "(last test running was `%s' in `%s')!"
504                                 % (
505                                     wrapped_testcase_suite.last_test,
506                                     wrapped_testcase_suite.last_test_temp_dir,
507                                 )
508                             )
509                             fail = True
510
511                 if fail:
512                     wrapped_testcase_suite.child.terminate()
513                     try:
514                         # terminating the child process tends to leave orphan
515                         # VPP process around
516                         if wrapped_testcase_suite.vpp_pid:
517                             os.kill(wrapped_testcase_suite.vpp_pid, signal.SIGTERM)
518                     except OSError:
519                         # already dead
520                         pass
521                     wrapped_testcase_suite.result.crashed = True
522                     wrapped_testcase_suite.result.process_result(
523                         wrapped_testcase_suite.last_test_id, ERROR
524                     )
525                     stop_run = (
526                         process_finished_testsuite(
527                             wrapped_testcase_suite,
528                             finished_testcase_suites,
529                             failed_wrapped_testcases,
530                             results,
531                         )
532                         or stop_run
533                     )
534
535             for finished_testcase in finished_testcase_suites:
536                 # Somewhat surprisingly, the join below may
537                 # timeout, even if client signaled that
538                 # it finished - so we note it just in case.
539                 join_start = time.time()
540                 finished_testcase.child.join(test_finished_join_timeout)
541                 join_end = time.time()
542                 if join_end - join_start >= test_finished_join_timeout:
543                     finished_testcase.logger.error(
544                         "Timeout joining finished test: %s (pid %d)"
545                         % (finished_testcase.last_test, finished_testcase.child.pid)
546                     )
547                 finished_testcase.close_pipes()
548                 wrapped_testcase_suites.remove(finished_testcase)
549                 finished_unread_testcases.add(finished_testcase)
550                 finished_testcase.stdouterr_queue.put(None)
551                 on_suite_finish(finished_testcase)
552                 if stop_run:
553                     while testcase_suites:
554                         results.append(TestResult(testcase_suites.pop(0)))
555                 elif testcase_suites:
556                     a_suite = testcase_suites.pop(0)
557                     while a_suite and a_suite.is_tagged_run_solo:
558                         solo_testcase_suites.append(a_suite)
559                         if testcase_suites:
560                             a_suite = testcase_suites.pop(0)
561                         else:
562                             a_suite = None
563                     if a_suite and can_run_suite(a_suite):
564                         run_suite(a_suite)
565                 if solo_testcase_suites and tests_running == 0:
566                     a_suite = solo_testcase_suites.pop(0)
567                     run_suite(a_suite)
568             time.sleep(0.1)
569     except Exception:
570         for wrapped_testcase_suite in wrapped_testcase_suites:
571             wrapped_testcase_suite.child.terminate()
572             wrapped_testcase_suite.stdouterr_queue.put(None)
573         raise
574     finally:
575         read_from_testcases.clear()
576         stdouterr_thread.join(config.timeout)
577         manager.shutdown()
578
579     handle_cores(failed_wrapped_testcases)
580     return results
581
582
583 class TestSuiteWrapper(unittest.TestSuite):
584     cpus_used = 0
585
586     def __init__(self):
587         return super().__init__()
588
589     def addTest(self, test):
590         self.cpus_used = max(self.cpus_used, test.get_cpus_required())
591         super().addTest(test)
592
593     def assign_cpus(self, cpus):
594         self.cpus = cpus
595
596     def _handleClassSetUp(self, test, result):
597         if not test.__class__.skipped_due_to_cpu_lack:
598             test.assign_cpus(self.cpus)
599         super()._handleClassSetUp(test, result)
600
601     def get_assigned_cpus(self):
602         return self.cpus
603
604
605 class SplitToSuitesCallback:
606     def __init__(self, filter_callback):
607         self.suites = {}
608         self.suite_name = "default"
609         self.filter_callback = filter_callback
610         self.filtered = TestSuiteWrapper()
611
612     def __call__(self, file_name, cls, method):
613         test_method = cls(method)
614         if self.filter_callback(file_name, cls.__name__, method):
615             self.suite_name = file_name + cls.__name__
616             if self.suite_name not in self.suites:
617                 self.suites[self.suite_name] = TestSuiteWrapper()
618                 self.suites[self.suite_name].is_tagged_run_solo = False
619             self.suites[self.suite_name].addTest(test_method)
620             if test_method.is_tagged_run_solo():
621                 self.suites[self.suite_name].is_tagged_run_solo = True
622
623         else:
624             self.filtered.addTest(test_method)
625
626
627 def parse_test_filter(test_filter):
628     f = test_filter
629     filter_file_name = None
630     filter_class_name = None
631     filter_func_name = None
632     if f:
633         if "." in f:
634             parts = f.split(".")
635             if len(parts) > 3:
636                 raise Exception("Unrecognized %s option: %s" % (test_option, f))
637             if len(parts) > 2:
638                 if parts[2] not in ("*", ""):
639                     filter_func_name = parts[2]
640             if parts[1] not in ("*", ""):
641                 filter_class_name = parts[1]
642             if parts[0] not in ("*", ""):
643                 if parts[0].startswith("test_"):
644                     filter_file_name = parts[0]
645                 else:
646                     filter_file_name = "test_%s" % parts[0]
647         else:
648             if f.startswith("test_"):
649                 filter_file_name = f
650             else:
651                 filter_file_name = "test_%s" % f
652     if filter_file_name:
653         filter_file_name = "%s.py" % filter_file_name
654     return filter_file_name, filter_class_name, filter_func_name
655
656
657 def filter_tests(tests, filter_cb):
658     result = TestSuiteWrapper()
659     for t in tests:
660         if isinstance(t, unittest.suite.TestSuite):
661             # this is a bunch of tests, recursively filter...
662             x = filter_tests(t, filter_cb)
663             if x.countTestCases() > 0:
664                 result.addTest(x)
665         elif isinstance(t, unittest.TestCase):
666             # this is a single test
667             parts = t.id().split(".")
668             # t.id() for common cases like this:
669             # test_classifier.TestClassifier.test_acl_ip
670             # apply filtering only if it is so
671             if len(parts) == 3:
672                 if not filter_cb(parts[0], parts[1], parts[2]):
673                     continue
674             result.addTest(t)
675         else:
676             # unexpected object, don't touch it
677             result.addTest(t)
678     return result
679
680
681 class FilterByTestOption:
682     def __init__(self, filter_file_name, filter_class_name, filter_func_name):
683         self.filter_file_name = filter_file_name
684         self.filter_class_name = filter_class_name
685         self.filter_func_name = filter_func_name
686
687     def __call__(self, file_name, class_name, func_name):
688         if self.filter_file_name:
689             fn_match = fnmatch.fnmatch(file_name, self.filter_file_name)
690             if not fn_match:
691                 return False
692         if self.filter_class_name and class_name != self.filter_class_name:
693             return False
694         if self.filter_func_name and func_name != self.filter_func_name:
695             return False
696         return True
697
698
699 class FilterByClassList:
700     def __init__(self, classes_with_filenames):
701         self.classes_with_filenames = classes_with_filenames
702
703     def __call__(self, file_name, class_name, func_name):
704         return ".".join([file_name, class_name]) in self.classes_with_filenames
705
706
707 def suite_from_failed(suite, failed):
708     failed = {x.rsplit(".", 1)[0] for x in failed}
709     filter_cb = FilterByClassList(failed)
710     suite = filter_tests(suite, filter_cb)
711     return suite
712
713
714 class AllResults(dict):
715     def __init__(self):
716         super(AllResults, self).__init__()
717         self.all_testcases = 0
718         self.results_per_suite = []
719         self[PASS] = 0
720         self[FAIL] = 0
721         self[ERROR] = 0
722         self[SKIP] = 0
723         self[SKIP_CPU_SHORTAGE] = 0
724         self[TEST_RUN] = 0
725         self.rerun = []
726         self.testsuites_no_tests_run = []
727
728     def add_results(self, result):
729         self.results_per_suite.append(result)
730         result_types = [PASS, FAIL, ERROR, SKIP, TEST_RUN, SKIP_CPU_SHORTAGE]
731         for result_type in result_types:
732             self[result_type] += len(result[result_type])
733
734     def add_result(self, result):
735         retval = 0
736         self.all_testcases += result.testcase_suite.countTestCases()
737         self.add_results(result)
738
739         if result.no_tests_run():
740             self.testsuites_no_tests_run.append(result.testcase_suite)
741             if result.crashed:
742                 retval = -1
743             else:
744                 retval = 1
745         elif not result.was_successful():
746             retval = 1
747
748         if retval != 0:
749             self.rerun.append(result.testcase_suite)
750
751         return retval
752
753     def print_results(self):
754         print("")
755         print(double_line_delim)
756         print("TEST RESULTS:")
757
758         def indent_results(lines):
759             lines = list(filter(None, lines))
760             maximum = max(lines, key=lambda x: x.index(":"))
761             maximum = 4 + maximum.index(":")
762             for l in lines:
763                 padding = " " * (maximum - l.index(":"))
764                 print(f"{padding}{l}")
765
766         indent_results(
767             [
768                 f"Scheduled tests: {self.all_testcases}",
769                 f"Executed tests: {self[TEST_RUN]}",
770                 f"Passed tests: {colorize(self[PASS], GREEN)}",
771                 f"Skipped tests: {colorize(self[SKIP], YELLOW)}"
772                 if self[SKIP]
773                 else None,
774                 f"Not Executed tests: {colorize(self.not_executed, RED)}"
775                 if self.not_executed
776                 else None,
777                 f"Failures: {colorize(self[FAIL], RED)}" if self[FAIL] else None,
778                 f"Errors: {colorize(self[ERROR], RED)}" if self[ERROR] else None,
779                 "Tests skipped due to lack of CPUS: "
780                 f"{colorize(self[SKIP_CPU_SHORTAGE], YELLOW)}"
781                 if self[SKIP_CPU_SHORTAGE]
782                 else None,
783             ]
784         )
785
786         if self.all_failed > 0:
787             print("FAILURES AND ERRORS IN TESTS:")
788             for result in self.results_per_suite:
789                 failed_testcase_ids = result[FAIL]
790                 errored_testcase_ids = result[ERROR]
791                 old_testcase_name = None
792                 if failed_testcase_ids:
793                     for failed_test_id in failed_testcase_ids:
794                         new_testcase_name, test_name = result.get_testcase_names(
795                             failed_test_id
796                         )
797                         if new_testcase_name != old_testcase_name:
798                             print(
799                                 "  Testcase name: {}".format(
800                                     colorize(new_testcase_name, RED)
801                                 )
802                             )
803                             old_testcase_name = new_testcase_name
804                         print(
805                             "    FAILURE: {} [{}]".format(
806                                 colorize(test_name, RED), failed_test_id
807                             )
808                         )
809                 if errored_testcase_ids:
810                     for errored_test_id in errored_testcase_ids:
811                         new_testcase_name, test_name = result.get_testcase_names(
812                             errored_test_id
813                         )
814                         if new_testcase_name != old_testcase_name:
815                             print(
816                                 "  Testcase name: {}".format(
817                                     colorize(new_testcase_name, RED)
818                                 )
819                             )
820                             old_testcase_name = new_testcase_name
821                         print(
822                             "      ERROR: {} [{}]".format(
823                                 colorize(test_name, RED), errored_test_id
824                             )
825                         )
826         if self.testsuites_no_tests_run:
827             print("TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:")
828             tc_classes = set()
829             for testsuite in self.testsuites_no_tests_run:
830                 for testcase in testsuite:
831                     tc_classes.add(get_testcase_doc_name(testcase))
832             for tc_class in tc_classes:
833                 print("  {}".format(colorize(tc_class, RED)))
834
835         if self[SKIP_CPU_SHORTAGE]:
836             print()
837             print(
838                 colorize(
839                     "     SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT"
840                     " ENOUGH CPUS AVAILABLE",
841                     YELLOW,
842                 )
843             )
844         print(double_line_delim)
845         print("")
846
847     @property
848     def not_executed(self):
849         return self.all_testcases - self[TEST_RUN]
850
851     @property
852     def all_failed(self):
853         return self[FAIL] + self[ERROR]
854
855
856 def parse_results(results):
857     """
858     Prints the number of scheduled, executed, not executed, passed, failed,
859     errored and skipped tests and details about failed and errored tests.
860
861     Also returns all suites where any test failed.
862
863     :param results:
864     :return:
865     """
866
867     results_per_suite = AllResults()
868     crashed = False
869     failed = False
870     for result in results:
871         result_code = results_per_suite.add_result(result)
872         if result_code == 1:
873             failed = True
874         elif result_code == -1:
875             crashed = True
876
877     results_per_suite.print_results()
878
879     if crashed:
880         return_code = -1
881     elif failed:
882         return_code = 1
883     else:
884         return_code = 0
885     return return_code, results_per_suite.rerun
886
887
888 if __name__ == "__main__":
889
890     print(f"Config is: {config}")
891
892     if config.sanity:
893         print("Running sanity test case.")
894         try:
895             rc = sanity_run_vpp.main()
896             if rc != 0:
897                 sys.exit(rc)
898         except Exception as e:
899             print(traceback.format_exc())
900             print("Couldn't run sanity test case.")
901             sys.exit(-1)
902
903     test_finished_join_timeout = 15
904
905     debug_gdb = config.debug in ["gdb", "gdbserver", "attach"]
906     debug_core = config.debug == "core"
907
908     run_interactive = debug_gdb or config.step or config.force_foreground
909
910     max_concurrent_tests = 0
911     print(f"OS reports {num_cpus} available cpu(s).")
912
913     test_jobs = config.jobs
914     if test_jobs == "auto":
915         if run_interactive:
916             max_concurrent_tests = 1
917             print("Interactive mode required, running tests consecutively.")
918         else:
919             max_concurrent_tests = num_cpus
920             print(
921                 f"Running at most {max_concurrent_tests} python test "
922                 "processes concurrently."
923             )
924     else:
925         max_concurrent_tests = test_jobs
926         print(
927             f"Running at most {max_concurrent_tests} python test processes "
928             "concurrently as set by 'TEST_JOBS'."
929         )
930
931     print(f"Using at most {max_vpp_cpus} cpus for VPP threads.")
932
933     if run_interactive and max_concurrent_tests > 1:
934         raise NotImplementedError(
935             "Running tests interactively (DEBUG is gdb[server] or ATTACH or "
936             "STEP is set) in parallel (TEST_JOBS is more than 1) is not "
937             "supported"
938         )
939
940     descriptions = True
941
942     print("Running tests using custom test runner.")
943     filter_file, filter_class, filter_func = parse_test_filter(config.filter)
944
945     print(
946         "Selected filters: file=%s, class=%s, function=%s"
947         % (filter_file, filter_class, filter_func)
948     )
949
950     filter_cb = FilterByTestOption(filter_file, filter_class, filter_func)
951
952     ignore_path = config.venv_dir
953     cb = SplitToSuitesCallback(filter_cb)
954     for d in config.test_src_dir:
955         print("Adding tests from directory tree %s" % d)
956         discover_tests(d, cb, ignore_path)
957
958     # suites are not hashable, need to use list
959     suites = []
960     tests_amount = 0
961     for testcase_suite in cb.suites.values():
962         tests_amount += testcase_suite.countTestCases()
963         if testcase_suite.cpus_used > max_vpp_cpus:
964             # here we replace test functions with lambdas to just skip them
965             # but we also replace setUp/tearDown functions to do nothing
966             # so that the test can be "started" and "stopped", so that we can
967             # still keep those prints (test description - SKIP), which are done
968             # in stopTest() (for that to trigger, test function must run)
969             for t in testcase_suite:
970                 for m in dir(t):
971                     if m.startswith("test_"):
972                         setattr(t, m, lambda: t.skipTest("not enough cpus"))
973                 setattr(t.__class__, "setUpClass", lambda: None)
974                 setattr(t.__class__, "tearDownClass", lambda: None)
975                 setattr(t, "setUp", lambda: None)
976                 setattr(t, "tearDown", lambda: None)
977                 t.__class__.skipped_due_to_cpu_lack = True
978         suites.append(testcase_suite)
979
980     print(
981         "%s out of %s tests match specified filters"
982         % (tests_amount, tests_amount + cb.filtered.countTestCases())
983     )
984
985     if not config.extended:
986         print("Not running extended tests (some tests will be skipped)")
987
988     attempts = config.retries + 1
989     if attempts > 1:
990         print("Perform %s attempts to pass the suite..." % attempts)
991
992     if run_interactive and suites:
993         # don't fork if requiring interactive terminal
994         print("Running tests in foreground in the current process")
995         full_suite = unittest.TestSuite()
996         free_cpus = list(available_cpus)
997         cpu_shortage = False
998         for suite in suites:
999             if suite.cpus_used <= max_vpp_cpus:
1000                 suite.assign_cpus(free_cpus[: suite.cpus_used])
1001             else:
1002                 suite.assign_cpus([])
1003                 cpu_shortage = True
1004         full_suite.addTests(suites)
1005         result = VppTestRunner(
1006             verbosity=config.verbose, failfast=config.failfast, print_summary=True
1007         ).run(full_suite)
1008         was_successful = result.wasSuccessful()
1009         if not was_successful:
1010             for test_case_info in result.failed_test_cases_info:
1011                 handle_failed_suite(
1012                     test_case_info.logger,
1013                     test_case_info.tempdir,
1014                     test_case_info.vpp_pid,
1015                     config.vpp,
1016                 )
1017                 if test_case_info in result.core_crash_test_cases_info:
1018                     check_and_handle_core(
1019                         test_case_info.vpp_bin_path,
1020                         test_case_info.tempdir,
1021                         test_case_info.core_crash_test,
1022                     )
1023
1024         if cpu_shortage:
1025             print()
1026             print(
1027                 colorize(
1028                     "SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT"
1029                     " ENOUGH CPUS AVAILABLE",
1030                     YELLOW,
1031                 )
1032             )
1033             print()
1034         sys.exit(not was_successful)
1035     else:
1036         print(
1037             "Running each VPPTestCase in a separate background process"
1038             f" with at most {max_concurrent_tests} parallel python test "
1039             "process(es)"
1040         )
1041         exit_code = 0
1042         while suites and attempts > 0:
1043             results = run_forked(suites)
1044             exit_code, suites = parse_results(results)
1045             attempts -= 1
1046             if exit_code == 0:
1047                 print("Test run was successful")
1048             else:
1049                 print("%s attempt(s) left." % attempts)
1050         sys.exit(exit_code)