tests: fix parallel runs skipping some tests
[vpp.git] / test / run_tests.py
1 #!/usr/bin/env python3
2
3 import sys
4 import shutil
5 import os
6 import fnmatch
7 import unittest
8 import time
9 import threading
10 import traceback
11 import signal
12 import re
13 from multiprocessing import Process, Pipe, get_context
14 from multiprocessing.queues import Queue
15 from multiprocessing.managers import BaseManager
16 from config import config, num_cpus, available_cpus, max_vpp_cpus
17 from framework import (
18     VppTestRunner,
19     VppTestCase,
20     get_testcase_doc_name,
21     get_test_description,
22     PASS,
23     FAIL,
24     ERROR,
25     SKIP,
26     TEST_RUN,
27     SKIP_CPU_SHORTAGE,
28 )
29 from debug import spawn_gdb
30 from log import (
31     get_parallel_logger,
32     double_line_delim,
33     RED,
34     YELLOW,
35     GREEN,
36     colorize,
37     single_line_delim,
38 )
39 from discover_tests import discover_tests
40 import sanity_run_vpp
41 from subprocess import check_output, CalledProcessError
42 from util import check_core_path, get_core_path, is_core_present
43
44 # timeout which controls how long the child has to finish after seeing
45 # a core dump in test temporary directory. If this is exceeded, parent assumes
46 # that child process is stuck (e.g. waiting for event from vpp) and kill
47 # the child
48 core_timeout = 3
49
50
51 class StreamQueue(Queue):
52     def write(self, msg):
53         self.put(msg)
54
55     def flush(self):
56         sys.__stdout__.flush()
57         sys.__stderr__.flush()
58
59     def fileno(self):
60         return self._writer.fileno()
61
62
63 class StreamQueueManager(BaseManager):
64     pass
65
66
67 StreamQueueManager.register("StreamQueue", StreamQueue)
68
69
70 class TestResult(dict):
71     def __init__(self, testcase_suite, testcases_by_id=None):
72         super(TestResult, self).__init__()
73         self[PASS] = []
74         self[FAIL] = []
75         self[ERROR] = []
76         self[SKIP] = []
77         self[SKIP_CPU_SHORTAGE] = []
78         self[TEST_RUN] = []
79         self.crashed = False
80         self.testcase_suite = testcase_suite
81         self.testcases = [testcase for testcase in testcase_suite]
82         self.testcases_by_id = testcases_by_id
83
84     def was_successful(self):
85         return (
86             0 == len(self[FAIL]) == len(self[ERROR])
87             and len(self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE])
88             == self.testcase_suite.countTestCases()
89         )
90
91     def no_tests_run(self):
92         return 0 == len(self[TEST_RUN])
93
94     def process_result(self, test_id, result):
95         self[result].append(test_id)
96
97     def suite_from_failed(self):
98         rerun_ids = set([])
99         for testcase in self.testcase_suite:
100             tc_id = testcase.id()
101             if tc_id not in self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE]:
102                 rerun_ids.add(tc_id)
103         if rerun_ids:
104             return suite_from_failed(self.testcase_suite, rerun_ids)
105
106     def get_testcase_names(self, test_id):
107         # could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
108         setup_teardown_match = re.match(
109             r"((tearDownClass)|(setUpClass)) \((.+\..+)\)", test_id
110         )
111         if setup_teardown_match:
112             test_name, _, _, testcase_name = setup_teardown_match.groups()
113             if len(testcase_name.split(".")) == 2:
114                 for key in self.testcases_by_id.keys():
115                     if key.startswith(testcase_name):
116                         testcase_name = key
117                         break
118             testcase_name = self._get_testcase_doc_name(testcase_name)
119         else:
120             test_name = self._get_test_description(test_id)
121             testcase_name = self._get_testcase_doc_name(test_id)
122
123         return testcase_name, test_name
124
125     def _get_test_description(self, test_id):
126         if test_id in self.testcases_by_id:
127             desc = get_test_description(descriptions, self.testcases_by_id[test_id])
128         else:
129             desc = test_id
130         return desc
131
132     def _get_testcase_doc_name(self, test_id):
133         if test_id in self.testcases_by_id:
134             doc_name = get_testcase_doc_name(self.testcases_by_id[test_id])
135         else:
136             doc_name = test_id
137         return doc_name
138
139
140 def test_runner_wrapper(
141     suite, keep_alive_pipe, stdouterr_queue, finished_pipe, result_pipe, logger
142 ):
143     sys.stdout = stdouterr_queue
144     sys.stderr = stdouterr_queue
145     VppTestCase.parallel_handler = logger.handlers[0]
146     result = VppTestRunner(
147         keep_alive_pipe=keep_alive_pipe,
148         descriptions=descriptions,
149         verbosity=config.verbose,
150         result_pipe=result_pipe,
151         failfast=config.failfast,
152         print_summary=False,
153     ).run(suite)
154     finished_pipe.send(result.wasSuccessful())
155     finished_pipe.close()
156     keep_alive_pipe.close()
157
158
159 class TestCaseWrapper(object):
160     def __init__(self, testcase_suite, manager):
161         self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(duplex=False)
162         self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
163         self.result_parent_end, self.result_child_end = Pipe(duplex=False)
164         self.testcase_suite = testcase_suite
165         self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
166         self.logger = get_parallel_logger(self.stdouterr_queue)
167         self.child = Process(
168             target=test_runner_wrapper,
169             args=(
170                 testcase_suite,
171                 self.keep_alive_child_end,
172                 self.stdouterr_queue,
173                 self.finished_child_end,
174                 self.result_child_end,
175                 self.logger,
176             ),
177         )
178         self.child.start()
179         self.last_test_temp_dir = None
180         self.last_test_vpp_binary = None
181         self._last_test = None
182         self.last_test_id = None
183         self.vpp_pid = None
184         self.last_heard = time.time()
185         self.core_detected_at = None
186         self.testcases_by_id = {}
187         self.testclasess_with_core = {}
188         for testcase in self.testcase_suite:
189             self.testcases_by_id[testcase.id()] = testcase
190         self.result = TestResult(testcase_suite, self.testcases_by_id)
191
192     @property
193     def last_test(self):
194         return self._last_test
195
196     @last_test.setter
197     def last_test(self, test_id):
198         self.last_test_id = test_id
199         if test_id in self.testcases_by_id:
200             testcase = self.testcases_by_id[test_id]
201             self._last_test = testcase.shortDescription()
202             if not self._last_test:
203                 self._last_test = str(testcase)
204         else:
205             self._last_test = test_id
206
207     def add_testclass_with_core(self):
208         if self.last_test_id in self.testcases_by_id:
209             test = self.testcases_by_id[self.last_test_id]
210             class_name = unittest.util.strclass(test.__class__)
211             test_name = "'{}' ({})".format(
212                 get_test_description(descriptions, test), self.last_test_id
213             )
214         else:
215             test_name = self.last_test_id
216             class_name = re.match(
217                 r"((tearDownClass)|(setUpClass)) " r"\((.+\..+)\)", test_name
218             ).groups()[3]
219         if class_name not in self.testclasess_with_core:
220             self.testclasess_with_core[class_name] = (
221                 test_name,
222                 self.last_test_vpp_binary,
223                 self.last_test_temp_dir,
224             )
225
226     def close_pipes(self):
227         self.keep_alive_child_end.close()
228         self.finished_child_end.close()
229         self.result_child_end.close()
230         self.keep_alive_parent_end.close()
231         self.finished_parent_end.close()
232         self.result_parent_end.close()
233
234     def was_successful(self):
235         return self.result.was_successful()
236
237     @property
238     def cpus_used(self):
239         return self.testcase_suite.cpus_used
240
241     def get_assigned_cpus(self):
242         return self.testcase_suite.get_assigned_cpus()
243
244
245 def stdouterr_reader_wrapper(
246     unread_testcases, finished_unread_testcases, read_testcases
247 ):
248     read_testcase = None
249     while read_testcases.is_set() or unread_testcases:
250         if finished_unread_testcases:
251             read_testcase = finished_unread_testcases.pop()
252             unread_testcases.remove(read_testcase)
253         elif unread_testcases:
254             read_testcase = unread_testcases.pop()
255         if read_testcase:
256             data = ""
257             while data is not None:
258                 sys.stdout.write(data)
259                 data = read_testcase.stdouterr_queue.get()
260
261             read_testcase.stdouterr_queue.close()
262             finished_unread_testcases.discard(read_testcase)
263             read_testcase = None
264
265
266 def handle_failed_suite(logger, last_test_temp_dir, vpp_pid, vpp_binary):
267     if last_test_temp_dir:
268         # Need to create link in case of a timeout or core dump without failure
269         lttd = os.path.basename(last_test_temp_dir)
270         link_path = os.path.join(config.failed_dir, f"{lttd}-FAILED")
271         if not os.path.exists(link_path):
272             os.symlink(last_test_temp_dir, link_path)
273         logger.error(
274             "Symlink to failed testcase directory: %s -> %s" % (link_path, lttd)
275         )
276
277         # Report core existence
278         core_path = get_core_path(last_test_temp_dir)
279         if os.path.exists(core_path):
280             logger.error(
281                 "Core-file exists in test temporary directory: %s!" % core_path
282             )
283             check_core_path(logger, core_path)
284             logger.debug("Running 'file %s':" % core_path)
285             try:
286                 info = check_output(["file", core_path])
287                 logger.debug(info)
288             except CalledProcessError as e:
289                 logger.error(
290                     "Subprocess returned with return code "
291                     "while running `file' utility on core-file "
292                     "returned: "
293                     "rc=%s",
294                     e.returncode,
295                 )
296             except OSError as e:
297                 logger.error(
298                     "Subprocess returned with OS error while "
299                     "running 'file' utility "
300                     "on core-file: "
301                     "(%s) %s",
302                     e.errno,
303                     e.strerror,
304                 )
305             except Exception as e:
306                 logger.exception("Unexpected error running `file' utility on core-file")
307             logger.error(f"gdb {vpp_binary} {core_path}")
308
309     if vpp_pid:
310         # Copy api post mortem
311         api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
312         if os.path.isfile(api_post_mortem_path):
313             logger.error(
314                 "Copying api_post_mortem.%d to %s" % (vpp_pid, last_test_temp_dir)
315             )
316             shutil.copy2(api_post_mortem_path, last_test_temp_dir)
317
318
319 def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
320     if is_core_present(tempdir):
321         if debug_core:
322             print(
323                 "VPP core detected in %s. Last test running was %s"
324                 % (tempdir, core_crash_test)
325             )
326             print(single_line_delim)
327             spawn_gdb(vpp_binary, get_core_path(tempdir))
328             print(single_line_delim)
329         elif config.compress_core:
330             print("Compressing core-file in test directory `%s'" % tempdir)
331             os.system("gzip %s" % get_core_path(tempdir))
332
333
334 def handle_cores(failed_testcases):
335     for failed_testcase in failed_testcases:
336         tcs_with_core = failed_testcase.testclasess_with_core
337         if tcs_with_core:
338             for test, vpp_binary, tempdir in tcs_with_core.values():
339                 check_and_handle_core(vpp_binary, tempdir, test)
340
341
342 def process_finished_testsuite(
343     wrapped_testcase_suite, finished_testcase_suites, failed_wrapped_testcases, results
344 ):
345     results.append(wrapped_testcase_suite.result)
346     finished_testcase_suites.add(wrapped_testcase_suite)
347     stop_run = False
348     if config.failfast and not wrapped_testcase_suite.was_successful():
349         stop_run = True
350
351     if not wrapped_testcase_suite.was_successful():
352         failed_wrapped_testcases.add(wrapped_testcase_suite)
353         handle_failed_suite(
354             wrapped_testcase_suite.logger,
355             wrapped_testcase_suite.last_test_temp_dir,
356             wrapped_testcase_suite.vpp_pid,
357             wrapped_testcase_suite.last_test_vpp_binary,
358         )
359
360     return stop_run
361
362
363 def run_forked(testcase_suites):
364     wrapped_testcase_suites = set()
365     solo_testcase_suites = []
366
367     # suites are unhashable, need to use list
368     results = []
369     unread_testcases = set()
370     finished_unread_testcases = set()
371     manager = StreamQueueManager()
372     manager.start()
373     tests_running = 0
374     free_cpus = list(available_cpus)
375
376     def on_suite_start(tc):
377         nonlocal tests_running
378         nonlocal free_cpus
379         tests_running = tests_running + 1
380
381     def on_suite_finish(tc):
382         nonlocal tests_running
383         nonlocal free_cpus
384         tests_running = tests_running - 1
385         assert tests_running >= 0
386         free_cpus.extend(tc.get_assigned_cpus())
387
388     def run_suite(suite):
389         nonlocal manager
390         nonlocal wrapped_testcase_suites
391         nonlocal unread_testcases
392         nonlocal free_cpus
393         suite.assign_cpus(free_cpus[: suite.cpus_used])
394         free_cpus = free_cpus[suite.cpus_used :]
395         wrapper = TestCaseWrapper(suite, manager)
396         wrapped_testcase_suites.add(wrapper)
397         unread_testcases.add(wrapper)
398         on_suite_start(suite)
399
400     def can_run_suite(suite):
401         return tests_running < max_concurrent_tests and (
402             suite.cpus_used <= len(free_cpus) or suite.cpus_used > max_vpp_cpus
403         )
404
405     while free_cpus and testcase_suites:
406         a_suite = testcase_suites[0]
407         if a_suite.is_tagged_run_solo:
408             a_suite = testcase_suites.pop(0)
409             solo_testcase_suites.append(a_suite)
410             continue
411         if can_run_suite(a_suite):
412             a_suite = testcase_suites.pop(0)
413             run_suite(a_suite)
414         else:
415             break
416
417     if tests_running == 0 and solo_testcase_suites:
418         a_suite = solo_testcase_suites.pop(0)
419         run_suite(a_suite)
420
421     read_from_testcases = threading.Event()
422     read_from_testcases.set()
423     stdouterr_thread = threading.Thread(
424         target=stdouterr_reader_wrapper,
425         args=(unread_testcases, finished_unread_testcases, read_from_testcases),
426     )
427     stdouterr_thread.start()
428
429     failed_wrapped_testcases = set()
430     stop_run = False
431
432     try:
433         while wrapped_testcase_suites or testcase_suites:
434             finished_testcase_suites = set()
435             for wrapped_testcase_suite in wrapped_testcase_suites:
436                 while wrapped_testcase_suite.result_parent_end.poll():
437                     wrapped_testcase_suite.result.process_result(
438                         *wrapped_testcase_suite.result_parent_end.recv()
439                     )
440                     wrapped_testcase_suite.last_heard = time.time()
441
442                 while wrapped_testcase_suite.keep_alive_parent_end.poll():
443                     (
444                         wrapped_testcase_suite.last_test,
445                         wrapped_testcase_suite.last_test_vpp_binary,
446                         wrapped_testcase_suite.last_test_temp_dir,
447                         wrapped_testcase_suite.vpp_pid,
448                     ) = wrapped_testcase_suite.keep_alive_parent_end.recv()
449                     wrapped_testcase_suite.last_heard = time.time()
450
451                 if wrapped_testcase_suite.finished_parent_end.poll():
452                     wrapped_testcase_suite.finished_parent_end.recv()
453                     wrapped_testcase_suite.last_heard = time.time()
454                     stop_run = (
455                         process_finished_testsuite(
456                             wrapped_testcase_suite,
457                             finished_testcase_suites,
458                             failed_wrapped_testcases,
459                             results,
460                         )
461                         or stop_run
462                     )
463                     continue
464
465                 fail = False
466                 if wrapped_testcase_suite.last_heard + config.timeout < time.time():
467                     fail = True
468                     wrapped_testcase_suite.logger.critical(
469                         "Child test runner process timed out "
470                         "(last test running was `%s' in `%s')!"
471                         % (
472                             wrapped_testcase_suite.last_test,
473                             wrapped_testcase_suite.last_test_temp_dir,
474                         )
475                     )
476                 elif not wrapped_testcase_suite.child.is_alive():
477                     fail = True
478                     wrapped_testcase_suite.logger.critical(
479                         "Child test runner process unexpectedly died "
480                         "(last test running was `%s' in `%s')!"
481                         % (
482                             wrapped_testcase_suite.last_test,
483                             wrapped_testcase_suite.last_test_temp_dir,
484                         )
485                     )
486                 elif (
487                     wrapped_testcase_suite.last_test_temp_dir
488                     and wrapped_testcase_suite.last_test_vpp_binary
489                 ):
490                     if is_core_present(wrapped_testcase_suite.last_test_temp_dir):
491                         wrapped_testcase_suite.add_testclass_with_core()
492                         if wrapped_testcase_suite.core_detected_at is None:
493                             wrapped_testcase_suite.core_detected_at = time.time()
494                         elif (
495                             wrapped_testcase_suite.core_detected_at + core_timeout
496                             < time.time()
497                         ):
498                             wrapped_testcase_suite.logger.critical(
499                                 "Child test runner process unresponsive and "
500                                 "core-file exists in test temporary directory "
501                                 "(last test running was `%s' in `%s')!"
502                                 % (
503                                     wrapped_testcase_suite.last_test,
504                                     wrapped_testcase_suite.last_test_temp_dir,
505                                 )
506                             )
507                             fail = True
508
509                 if fail:
510                     wrapped_testcase_suite.child.terminate()
511                     try:
512                         # terminating the child process tends to leave orphan
513                         # VPP process around
514                         if wrapped_testcase_suite.vpp_pid:
515                             os.kill(wrapped_testcase_suite.vpp_pid, signal.SIGTERM)
516                     except OSError:
517                         # already dead
518                         pass
519                     wrapped_testcase_suite.result.crashed = True
520                     wrapped_testcase_suite.result.process_result(
521                         wrapped_testcase_suite.last_test_id, ERROR
522                     )
523                     stop_run = (
524                         process_finished_testsuite(
525                             wrapped_testcase_suite,
526                             finished_testcase_suites,
527                             failed_wrapped_testcases,
528                             results,
529                         )
530                         or stop_run
531                     )
532
533             for finished_testcase in finished_testcase_suites:
534                 # Somewhat surprisingly, the join below may
535                 # timeout, even if client signaled that
536                 # it finished - so we note it just in case.
537                 join_start = time.time()
538                 finished_testcase.child.join(test_finished_join_timeout)
539                 join_end = time.time()
540                 if join_end - join_start >= test_finished_join_timeout:
541                     finished_testcase.logger.error(
542                         "Timeout joining finished test: %s (pid %d)"
543                         % (finished_testcase.last_test, finished_testcase.child.pid)
544                     )
545                 finished_testcase.close_pipes()
546                 wrapped_testcase_suites.remove(finished_testcase)
547                 finished_unread_testcases.add(finished_testcase)
548                 finished_testcase.stdouterr_queue.put(None)
549                 on_suite_finish(finished_testcase)
550                 if stop_run:
551                     while testcase_suites:
552                         results.append(TestResult(testcase_suites.pop(0)))
553                 elif testcase_suites:
554                     a_suite = testcase_suites[0]
555                     while a_suite and a_suite.is_tagged_run_solo:
556                         testcase_suites.pop(0)
557                         solo_testcase_suites.append(a_suite)
558                         if testcase_suites:
559                             a_suite = testcase_suites[0]
560                         else:
561                             a_suite = None
562                     if a_suite and can_run_suite(a_suite):
563                         testcase_suites.pop(0)
564                         run_suite(a_suite)
565                 if solo_testcase_suites and tests_running == 0:
566                     a_suite = solo_testcase_suites.pop(0)
567                     run_suite(a_suite)
568             time.sleep(0.1)
569     except Exception:
570         for wrapped_testcase_suite in wrapped_testcase_suites:
571             wrapped_testcase_suite.child.terminate()
572             wrapped_testcase_suite.stdouterr_queue.put(None)
573         raise
574     finally:
575         read_from_testcases.clear()
576         stdouterr_thread.join(config.timeout)
577         manager.shutdown()
578
579     handle_cores(failed_wrapped_testcases)
580     return results
581
582
583 class TestSuiteWrapper(unittest.TestSuite):
584     cpus_used = 0
585
586     def __init__(self):
587         return super().__init__()
588
589     def addTest(self, test):
590         self.cpus_used = max(self.cpus_used, test.get_cpus_required())
591         super().addTest(test)
592
593     def assign_cpus(self, cpus):
594         self.cpus = cpus
595
596     def _handleClassSetUp(self, test, result):
597         if not test.__class__.skipped_due_to_cpu_lack:
598             test.assign_cpus(self.cpus)
599         super()._handleClassSetUp(test, result)
600
601     def get_assigned_cpus(self):
602         return self.cpus
603
604
605 class SplitToSuitesCallback:
606     def __init__(self, filter_callback):
607         self.suites = {}
608         self.suite_name = "default"
609         self.filter_callback = filter_callback
610         self.filtered = TestSuiteWrapper()
611
612     def __call__(self, file_name, cls, method):
613         test_method = cls(method)
614         if self.filter_callback(file_name, cls.__name__, method):
615             self.suite_name = file_name + cls.__name__
616             if self.suite_name not in self.suites:
617                 self.suites[self.suite_name] = TestSuiteWrapper()
618                 self.suites[self.suite_name].is_tagged_run_solo = False
619             self.suites[self.suite_name].addTest(test_method)
620             if test_method.is_tagged_run_solo():
621                 self.suites[self.suite_name].is_tagged_run_solo = True
622
623         else:
624             self.filtered.addTest(test_method)
625
626
627 def parse_test_filter(test_filter):
628     f = test_filter
629     filter_file_name = None
630     filter_class_name = None
631     filter_func_name = None
632     if f:
633         if "." in f:
634             parts = f.split(".")
635             if len(parts) > 3:
636                 raise Exception(f"Invalid test filter: {test_filter}")
637             if len(parts) > 2:
638                 if parts[2] not in ("*", ""):
639                     filter_func_name = parts[2]
640             if parts[1] not in ("*", ""):
641                 filter_class_name = parts[1]
642             if parts[0] not in ("*", ""):
643                 if parts[0].startswith("test_"):
644                     filter_file_name = parts[0]
645                 else:
646                     filter_file_name = "test_%s" % parts[0]
647         else:
648             if f.startswith("test_"):
649                 filter_file_name = f
650             else:
651                 filter_file_name = "test_%s" % f
652     if filter_file_name:
653         filter_file_name = "%s.py" % filter_file_name
654     return filter_file_name, filter_class_name, filter_func_name
655
656
657 def filter_tests(tests, filter_cb):
658     result = TestSuiteWrapper()
659     for t in tests:
660         if isinstance(t, unittest.suite.TestSuite):
661             # this is a bunch of tests, recursively filter...
662             x = filter_tests(t, filter_cb)
663             if x.countTestCases() > 0:
664                 result.addTest(x)
665         elif isinstance(t, unittest.TestCase):
666             # this is a single test
667             parts = t.id().split(".")
668             # t.id() for common cases like this:
669             # test_classifier.TestClassifier.test_acl_ip
670             # apply filtering only if it is so
671             if len(parts) == 3:
672                 if not filter_cb(parts[0], parts[1], parts[2]):
673                     continue
674             result.addTest(t)
675         else:
676             # unexpected object, don't touch it
677             result.addTest(t)
678     return result
679
680
681 class FilterByTestOption:
682     def __init__(self, filters):
683         self.filters = filters
684
685     def __call__(self, file_name, class_name, func_name):
686         def test_one(
687             filter_file_name,
688             filter_class_name,
689             filter_func_name,
690             file_name,
691             class_name,
692             func_name,
693         ):
694             if filter_file_name:
695                 fn_match = fnmatch.fnmatch(file_name, filter_file_name)
696                 if not fn_match:
697                     return False
698             if filter_class_name and class_name != filter_class_name:
699                 return False
700             if filter_func_name and func_name != filter_func_name:
701                 return False
702             return True
703
704         for filter_file_name, filter_class_name, filter_func_name in self.filters:
705             if test_one(
706                 filter_file_name,
707                 filter_class_name,
708                 filter_func_name,
709                 file_name,
710                 class_name,
711                 func_name,
712             ):
713                 return True
714
715         return False
716
717
718 class FilterByClassList:
719     def __init__(self, classes_with_filenames):
720         self.classes_with_filenames = classes_with_filenames
721
722     def __call__(self, file_name, class_name, func_name):
723         return ".".join([file_name, class_name]) in self.classes_with_filenames
724
725
726 def suite_from_failed(suite, failed):
727     failed = {x.rsplit(".", 1)[0] for x in failed}
728     filter_cb = FilterByClassList(failed)
729     suite = filter_tests(suite, filter_cb)
730     return suite
731
732
733 class AllResults(dict):
734     def __init__(self):
735         super(AllResults, self).__init__()
736         self.all_testcases = 0
737         self.results_per_suite = []
738         self[PASS] = 0
739         self[FAIL] = 0
740         self[ERROR] = 0
741         self[SKIP] = 0
742         self[SKIP_CPU_SHORTAGE] = 0
743         self[TEST_RUN] = 0
744         self.rerun = []
745         self.testsuites_no_tests_run = []
746
747     def add_results(self, result):
748         self.results_per_suite.append(result)
749         result_types = [PASS, FAIL, ERROR, SKIP, TEST_RUN, SKIP_CPU_SHORTAGE]
750         for result_type in result_types:
751             self[result_type] += len(result[result_type])
752
753     def add_result(self, result):
754         retval = 0
755         self.all_testcases += result.testcase_suite.countTestCases()
756         self.add_results(result)
757
758         if result.no_tests_run():
759             self.testsuites_no_tests_run.append(result.testcase_suite)
760             if result.crashed:
761                 retval = -1
762             else:
763                 retval = 1
764         elif not result.was_successful():
765             retval = 1
766
767         if retval != 0:
768             self.rerun.append(result.testcase_suite)
769
770         return retval
771
772     def print_results(self):
773         print("")
774         print(double_line_delim)
775         print("TEST RESULTS:")
776
777         def indent_results(lines):
778             lines = list(filter(None, lines))
779             maximum = max(lines, key=lambda x: x.index(":"))
780             maximum = 4 + maximum.index(":")
781             for l in lines:
782                 padding = " " * (maximum - l.index(":"))
783                 print(f"{padding}{l}")
784
785         indent_results(
786             [
787                 f"Scheduled tests: {self.all_testcases}",
788                 f"Executed tests: {self[TEST_RUN]}",
789                 f"Passed tests: {colorize(self[PASS], GREEN)}",
790                 f"Skipped tests: {colorize(self[SKIP], YELLOW)}"
791                 if self[SKIP]
792                 else None,
793                 f"Not Executed tests: {colorize(self.not_executed, RED)}"
794                 if self.not_executed
795                 else None,
796                 f"Failures: {colorize(self[FAIL], RED)}" if self[FAIL] else None,
797                 f"Errors: {colorize(self[ERROR], RED)}" if self[ERROR] else None,
798                 "Tests skipped due to lack of CPUS: "
799                 f"{colorize(self[SKIP_CPU_SHORTAGE], YELLOW)}"
800                 if self[SKIP_CPU_SHORTAGE]
801                 else None,
802             ]
803         )
804
805         if self.all_failed > 0:
806             print("FAILURES AND ERRORS IN TESTS:")
807             for result in self.results_per_suite:
808                 failed_testcase_ids = result[FAIL]
809                 errored_testcase_ids = result[ERROR]
810                 old_testcase_name = None
811                 if failed_testcase_ids:
812                     for failed_test_id in failed_testcase_ids:
813                         new_testcase_name, test_name = result.get_testcase_names(
814                             failed_test_id
815                         )
816                         if new_testcase_name != old_testcase_name:
817                             print(
818                                 "  Testcase name: {}".format(
819                                     colorize(new_testcase_name, RED)
820                                 )
821                             )
822                             old_testcase_name = new_testcase_name
823                         print(
824                             "    FAILURE: {} [{}]".format(
825                                 colorize(test_name, RED), failed_test_id
826                             )
827                         )
828                 if errored_testcase_ids:
829                     for errored_test_id in errored_testcase_ids:
830                         new_testcase_name, test_name = result.get_testcase_names(
831                             errored_test_id
832                         )
833                         if new_testcase_name != old_testcase_name:
834                             print(
835                                 "  Testcase name: {}".format(
836                                     colorize(new_testcase_name, RED)
837                                 )
838                             )
839                             old_testcase_name = new_testcase_name
840                         print(
841                             "      ERROR: {} [{}]".format(
842                                 colorize(test_name, RED), errored_test_id
843                             )
844                         )
845         if self.testsuites_no_tests_run:
846             print("TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:")
847             tc_classes = set()
848             for testsuite in self.testsuites_no_tests_run:
849                 for testcase in testsuite:
850                     tc_classes.add(get_testcase_doc_name(testcase))
851             for tc_class in tc_classes:
852                 print("  {}".format(colorize(tc_class, RED)))
853
854         if self[SKIP_CPU_SHORTAGE]:
855             print()
856             print(
857                 colorize(
858                     "     SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT"
859                     " ENOUGH CPUS AVAILABLE",
860                     YELLOW,
861                 )
862             )
863         print(double_line_delim)
864         print("")
865
866     @property
867     def not_executed(self):
868         return self.all_testcases - self[TEST_RUN]
869
870     @property
871     def all_failed(self):
872         return self[FAIL] + self[ERROR]
873
874
875 def parse_results(results):
876     """
877     Prints the number of scheduled, executed, not executed, passed, failed,
878     errored and skipped tests and details about failed and errored tests.
879
880     Also returns all suites where any test failed.
881
882     :param results:
883     :return:
884     """
885
886     results_per_suite = AllResults()
887     crashed = False
888     failed = False
889     for result in results:
890         result_code = results_per_suite.add_result(result)
891         if result_code == 1:
892             failed = True
893         elif result_code == -1:
894             crashed = True
895
896     results_per_suite.print_results()
897
898     if crashed:
899         return_code = -1
900     elif failed:
901         return_code = 1
902     else:
903         return_code = 0
904     return return_code, results_per_suite.rerun
905
906
907 if __name__ == "__main__":
908
909     print(f"Config is: {config}")
910
911     if config.sanity:
912         print("Running sanity test case.")
913         try:
914             rc = sanity_run_vpp.main()
915             if rc != 0:
916                 sys.exit(rc)
917         except Exception as e:
918             print(traceback.format_exc())
919             print("Couldn't run sanity test case.")
920             sys.exit(-1)
921
922     test_finished_join_timeout = 15
923
924     debug_gdb = config.debug in ["gdb", "gdbserver", "attach"]
925     debug_core = config.debug == "core"
926
927     run_interactive = debug_gdb or config.step or config.force_foreground
928
929     max_concurrent_tests = 0
930     print(f"OS reports {num_cpus} available cpu(s).")
931
932     test_jobs = config.jobs
933     if test_jobs == "auto":
934         if run_interactive:
935             max_concurrent_tests = 1
936             print("Interactive mode required, running tests consecutively.")
937         else:
938             max_concurrent_tests = num_cpus
939             print(
940                 f"Running at most {max_concurrent_tests} python test "
941                 "processes concurrently."
942             )
943     else:
944         max_concurrent_tests = test_jobs
945         print(
946             f"Running at most {max_concurrent_tests} python test processes "
947             "concurrently as set by 'TEST_JOBS'."
948         )
949
950     print(f"Using at most {max_vpp_cpus} cpus for VPP threads.")
951
952     if run_interactive and max_concurrent_tests > 1:
953         raise NotImplementedError(
954             "Running tests interactively (DEBUG is gdb[server] or ATTACH or "
955             "STEP is set) in parallel (TEST_JOBS is more than 1) is not "
956             "supported"
957         )
958
959     descriptions = True
960
961     print("Running tests using custom test runner.")
962     filters = [(parse_test_filter(f)) for f in config.filter.split(",")]
963
964     print(
965         "Selected filters: ",
966         "|".join(
967             f"file={filter_file}, class={filter_class}, function={filter_func}"
968             for filter_file, filter_class, filter_func in filters
969         ),
970     )
971
972     filter_cb = FilterByTestOption(filters)
973
974     cb = SplitToSuitesCallback(filter_cb)
975     for d in config.test_src_dir:
976         print("Adding tests from directory tree %s" % d)
977         discover_tests(d, cb)
978
979     # suites are not hashable, need to use list
980     suites = []
981     tests_amount = 0
982     for testcase_suite in cb.suites.values():
983         tests_amount += testcase_suite.countTestCases()
984         if testcase_suite.cpus_used > max_vpp_cpus:
985             # here we replace test functions with lambdas to just skip them
986             # but we also replace setUp/tearDown functions to do nothing
987             # so that the test can be "started" and "stopped", so that we can
988             # still keep those prints (test description - SKIP), which are done
989             # in stopTest() (for that to trigger, test function must run)
990             for t in testcase_suite:
991                 for m in dir(t):
992                     if m.startswith("test_"):
993                         setattr(t, m, lambda: t.skipTest("not enough cpus"))
994                 setattr(t.__class__, "setUpClass", lambda: None)
995                 setattr(t.__class__, "tearDownClass", lambda: None)
996                 setattr(t, "setUp", lambda: None)
997                 setattr(t, "tearDown", lambda: None)
998                 t.__class__.skipped_due_to_cpu_lack = True
999         suites.append(testcase_suite)
1000
1001     print(
1002         "%s out of %s tests match specified filters"
1003         % (tests_amount, tests_amount + cb.filtered.countTestCases())
1004     )
1005
1006     if not config.extended:
1007         print("Not running extended tests (some tests will be skipped)")
1008
1009     attempts = config.retries + 1
1010     if attempts > 1:
1011         print("Perform %s attempts to pass the suite..." % attempts)
1012
1013     if run_interactive and suites:
1014         # don't fork if requiring interactive terminal
1015         print("Running tests in foreground in the current process")
1016         full_suite = unittest.TestSuite()
1017         free_cpus = list(available_cpus)
1018         cpu_shortage = False
1019         for suite in suites:
1020             if suite.cpus_used <= max_vpp_cpus:
1021                 suite.assign_cpus(free_cpus[: suite.cpus_used])
1022             else:
1023                 suite.assign_cpus([])
1024                 cpu_shortage = True
1025         full_suite.addTests(suites)
1026         result = VppTestRunner(
1027             verbosity=config.verbose, failfast=config.failfast, print_summary=True
1028         ).run(full_suite)
1029         was_successful = result.wasSuccessful()
1030         if not was_successful:
1031             for test_case_info in result.failed_test_cases_info:
1032                 handle_failed_suite(
1033                     test_case_info.logger,
1034                     test_case_info.tempdir,
1035                     test_case_info.vpp_pid,
1036                     config.vpp,
1037                 )
1038                 if test_case_info in result.core_crash_test_cases_info:
1039                     check_and_handle_core(
1040                         test_case_info.vpp_bin_path,
1041                         test_case_info.tempdir,
1042                         test_case_info.core_crash_test,
1043                     )
1044
1045         if cpu_shortage:
1046             print()
1047             print(
1048                 colorize(
1049                     "SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT"
1050                     " ENOUGH CPUS AVAILABLE",
1051                     YELLOW,
1052                 )
1053             )
1054             print()
1055         sys.exit(not was_successful)
1056     else:
1057         print(
1058             "Running each VPPTestCase in a separate background process"
1059             f" with at most {max_concurrent_tests} parallel python test "
1060             "process(es)"
1061         )
1062         exit_code = 0
1063         while suites and attempts > 0:
1064             results = run_forked(suites)
1065             exit_code, suites = parse_results(results)
1066             attempts -= 1
1067             if exit_code == 0:
1068                 print("Test run was successful")
1069             else:
1070                 print("%s attempt(s) left." % attempts)
1071         sys.exit(exit_code)