hs-test: fixed timed out tests passing in the CI
[vpp.git] / test / run_tests.py
1 #!/usr/bin/env python3
2
3 import sys
4 import shutil
5 import os
6 import fnmatch
7 import unittest
8 import time
9 import threading
10 import traceback
11 import signal
12 import re
13 from multiprocessing import Process, Pipe, get_context
14 from multiprocessing.queues import Queue
15 from multiprocessing.managers import BaseManager
16 from config import config, num_cpus, available_cpus, max_vpp_cpus
17 from vpp_papi import VPPApiJSONFiles
18 from asfframework import (
19     VppTestRunner,
20     get_testcase_doc_name,
21     get_test_description,
22     get_failed_testcase_linkname,
23     get_testcase_dirname,
24 )
25 from framework import VppTestCase
26 from test_result_code import TestResultCode
27 from debug import spawn_gdb
28 from log import (
29     get_parallel_logger,
30     double_line_delim,
31     RED,
32     YELLOW,
33     GREEN,
34     colorize,
35     single_line_delim,
36 )
37 from discover_tests import discover_tests
38 import sanity_run_vpp
39 from subprocess import check_output, CalledProcessError
40 from util import check_core_path, get_core_path, is_core_present
41
42 # timeout which controls how long the child has to finish after seeing
43 # a core dump in test temporary directory. If this is exceeded, parent assumes
44 # that child process is stuck (e.g. waiting for event from vpp) and kill
45 # the child
46 core_timeout = 3
47
48
49 class StreamQueue(Queue):
50     def write(self, msg):
51         self.put(msg)
52
53     def flush(self):
54         sys.__stdout__.flush()
55         sys.__stderr__.flush()
56
57     def fileno(self):
58         return self._writer.fileno()
59
60
61 class StreamQueueManager(BaseManager):
62     pass
63
64
65 StreamQueueManager.register("StreamQueue", StreamQueue)
66
67
68 class TestResult(dict):
69     def __init__(self, testcase_suite, testcases_by_id=None):
70         super(TestResult, self).__init__()
71         for trc in list(TestResultCode):
72             self[trc] = []
73         self.crashed = False
74         self.testcase_suite = testcase_suite
75         self.testcases = [testcase for testcase in testcase_suite]
76         self.testcases_by_id = testcases_by_id
77
78     def was_successful(self):
79         return (
80             0
81             == len(self[TestResultCode.FAIL])
82             == len(self[TestResultCode.ERROR])
83             == len(self[TestResultCode.UNEXPECTED_PASS])
84             and len(self[TestResultCode.PASS])
85             + len(self[TestResultCode.SKIP])
86             + len(self[TestResultCode.SKIP_CPU_SHORTAGE])
87             + len(self[TestResultCode.EXPECTED_FAIL])
88             == self.testcase_suite.countTestCases()
89         )
90
91     def no_tests_run(self):
92         return 0 == len(self[TestResultCode.TEST_RUN])
93
94     def process_result(self, test_id, result):
95         self[result].append(test_id)
96
97     def suite_from_failed(self):
98         rerun_ids = set([])
99         for testcase in self.testcase_suite:
100             tc_id = testcase.id()
101             if (
102                 tc_id
103                 not in self[TestResultCode.PASS]
104                 + self[TestResultCode.SKIP]
105                 + self[TestResultCode.SKIP_CPU_SHORTAGE]
106                 + self[TestResultCode.EXPECTED_FAIL]
107             ):
108                 rerun_ids.add(tc_id)
109         if rerun_ids:
110             return suite_from_failed(self.testcase_suite, rerun_ids)
111
112     def get_testcase_names(self, test_id):
113         # could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
114         setup_teardown_match = re.match(
115             r"((tearDownClass)|(setUpClass)) \((.+\..+)\)", test_id
116         )
117         if setup_teardown_match:
118             test_name, _, _, testcase_name = setup_teardown_match.groups()
119             if len(testcase_name.split(".")) == 2:
120                 for key in self.testcases_by_id.keys():
121                     if key.startswith(testcase_name):
122                         testcase_name = key
123                         break
124             testcase_name = self._get_testcase_doc_name(testcase_name)
125         else:
126             test_name = self._get_test_description(test_id)
127             testcase_name = self._get_testcase_doc_name(test_id)
128
129         return testcase_name, test_name
130
131     def _get_test_description(self, test_id):
132         if test_id in self.testcases_by_id:
133             desc = get_test_description(descriptions, self.testcases_by_id[test_id])
134         else:
135             desc = test_id
136         return desc
137
138     def _get_testcase_doc_name(self, test_id):
139         if test_id in self.testcases_by_id:
140             doc_name = get_testcase_doc_name(self.testcases_by_id[test_id])
141         else:
142             doc_name = test_id
143         return doc_name
144
145
146 def test_runner_wrapper(
147     suite, keep_alive_pipe, stdouterr_queue, finished_pipe, result_pipe, logger
148 ):
149     sys.stdout = stdouterr_queue
150     sys.stderr = stdouterr_queue
151     VppTestCase.parallel_handler = logger.handlers[0]
152     result = VppTestRunner(
153         keep_alive_pipe=keep_alive_pipe,
154         descriptions=descriptions,
155         verbosity=config.verbose,
156         result_pipe=result_pipe,
157         failfast=config.failfast,
158         print_summary=False,
159     ).run(suite)
160     finished_pipe.send(result.wasSuccessful())
161     finished_pipe.close()
162     keep_alive_pipe.close()
163
164
165 class TestCaseWrapper(object):
166     def __init__(self, testcase_suite, manager):
167         self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(duplex=False)
168         self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
169         self.result_parent_end, self.result_child_end = Pipe(duplex=False)
170         self.testcase_suite = testcase_suite
171         self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
172         self.logger = get_parallel_logger(self.stdouterr_queue)
173         self.child = Process(
174             target=test_runner_wrapper,
175             args=(
176                 testcase_suite,
177                 self.keep_alive_child_end,
178                 self.stdouterr_queue,
179                 self.finished_child_end,
180                 self.result_child_end,
181                 self.logger,
182             ),
183         )
184         self.child.start()
185         self.last_test_temp_dir = None
186         self.last_test_vpp_binary = None
187         self._last_test = None
188         self.last_test_id = None
189         self.vpp_pid = None
190         self.last_heard = time.time()
191         self.core_detected_at = None
192         self.testcases_by_id = {}
193         self.testclasess_with_core = {}
194         for testcase in self.testcase_suite:
195             self.testcases_by_id[testcase.id()] = testcase
196         self.result = TestResult(testcase_suite, self.testcases_by_id)
197
198     @property
199     def last_test(self):
200         return self._last_test
201
202     @last_test.setter
203     def last_test(self, test_id):
204         self.last_test_id = test_id
205         if test_id in self.testcases_by_id:
206             testcase = self.testcases_by_id[test_id]
207             self._last_test = testcase.shortDescription()
208             if not self._last_test:
209                 self._last_test = str(testcase)
210         else:
211             self._last_test = test_id
212
213     def add_testclass_with_core(self):
214         if self.last_test_id in self.testcases_by_id:
215             test = self.testcases_by_id[self.last_test_id]
216             class_name = unittest.util.strclass(test.__class__)
217             test_name = "'{}' ({})".format(
218                 get_test_description(descriptions, test), self.last_test_id
219             )
220         else:
221             test_name = self.last_test_id
222             class_name = re.match(
223                 r"((tearDownClass)|(setUpClass)) " r"\((.+\..+)\)", test_name
224             ).groups()[3]
225         if class_name not in self.testclasess_with_core:
226             self.testclasess_with_core[class_name] = (
227                 test_name,
228                 self.last_test_vpp_binary,
229                 self.last_test_temp_dir,
230             )
231
232     def close_pipes(self):
233         self.keep_alive_child_end.close()
234         self.finished_child_end.close()
235         self.result_child_end.close()
236         self.keep_alive_parent_end.close()
237         self.finished_parent_end.close()
238         self.result_parent_end.close()
239
240     def was_successful(self):
241         return self.result.was_successful()
242
243     @property
244     def cpus_used(self):
245         return self.testcase_suite.cpus_used
246
247     def get_assigned_cpus(self):
248         return self.testcase_suite.get_assigned_cpus()
249
250
251 def stdouterr_reader_wrapper(
252     unread_testcases, finished_unread_testcases, read_testcases
253 ):
254     read_testcase = None
255     while read_testcases.is_set() or unread_testcases:
256         if finished_unread_testcases:
257             read_testcase = finished_unread_testcases.pop()
258             unread_testcases.remove(read_testcase)
259         elif unread_testcases:
260             read_testcase = unread_testcases.pop()
261         if read_testcase:
262             data = ""
263             while data is not None:
264                 sys.stdout.write(data)
265                 data = read_testcase.stdouterr_queue.get()
266
267             read_testcase.stdouterr_queue.close()
268             finished_unread_testcases.discard(read_testcase)
269             read_testcase = None
270
271
272 def handle_failed_suite(logger, last_test_temp_dir, vpp_pid, vpp_binary):
273     if last_test_temp_dir:
274         # Need to create link in case of a timeout or core dump without failure
275         lttd = os.path.basename(last_test_temp_dir)
276         link_path = os.path.join(config.failed_dir, f"{lttd}-FAILED")
277         if not os.path.exists(link_path):
278             os.symlink(last_test_temp_dir, link_path)
279         logger.error(
280             "Symlink to failed testcase directory: %s -> %s" % (link_path, lttd)
281         )
282
283         # Report core existence
284         core_path = get_core_path(last_test_temp_dir)
285         if os.path.exists(core_path):
286             logger.error(
287                 "Core-file exists in test temporary directory: %s!" % core_path
288             )
289             check_core_path(logger, core_path)
290             logger.debug("Running 'file %s':" % core_path)
291             try:
292                 info = check_output(["file", core_path])
293                 logger.debug(info)
294             except CalledProcessError as e:
295                 logger.error(
296                     "Subprocess returned with return code "
297                     "while running `file' utility on core-file "
298                     "returned: "
299                     "rc=%s",
300                     e.returncode,
301                 )
302             except OSError as e:
303                 logger.error(
304                     "Subprocess returned with OS error while "
305                     "running 'file' utility "
306                     "on core-file: "
307                     "(%s) %s",
308                     e.errno,
309                     e.strerror,
310                 )
311             except Exception as e:
312                 logger.exception("Unexpected error running `file' utility on core-file")
313             logger.error(f"gdb {vpp_binary} {core_path}")
314
315     if vpp_pid:
316         # Copy api post mortem
317         api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
318         if os.path.isfile(api_post_mortem_path):
319             logger.error(
320                 "Copying api_post_mortem.%d to %s" % (vpp_pid, last_test_temp_dir)
321             )
322             shutil.copy2(api_post_mortem_path, last_test_temp_dir)
323
324
325 def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
326     if is_core_present(tempdir):
327         if debug_core:
328             print(
329                 "VPP core detected in %s. Last test running was %s"
330                 % (tempdir, core_crash_test)
331             )
332             print(single_line_delim)
333             spawn_gdb(vpp_binary, get_core_path(tempdir))
334             print(single_line_delim)
335         elif config.compress_core:
336             print("Compressing core-file in test directory `%s'" % tempdir)
337             os.system("gzip %s" % get_core_path(tempdir))
338
339
340 def handle_cores(failed_testcases):
341     for failed_testcase in failed_testcases:
342         tcs_with_core = failed_testcase.testclasess_with_core
343         if tcs_with_core:
344             for test, vpp_binary, tempdir in tcs_with_core.values():
345                 check_and_handle_core(vpp_binary, tempdir, test)
346
347
348 def process_finished_testsuite(
349     wrapped_testcase_suite, finished_testcase_suites, failed_wrapped_testcases, results
350 ):
351     results.append(wrapped_testcase_suite.result)
352     finished_testcase_suites.add(wrapped_testcase_suite)
353     stop_run = False
354     if config.failfast and not wrapped_testcase_suite.was_successful():
355         stop_run = True
356
357     if not wrapped_testcase_suite.was_successful():
358         failed_wrapped_testcases.add(wrapped_testcase_suite)
359         handle_failed_suite(
360             wrapped_testcase_suite.logger,
361             wrapped_testcase_suite.last_test_temp_dir,
362             wrapped_testcase_suite.vpp_pid,
363             wrapped_testcase_suite.last_test_vpp_binary,
364         )
365
366     return stop_run
367
368
369 def run_forked(testcase_suites):
370     wrapped_testcase_suites = set()
371     solo_testcase_suites = []
372
373     # suites are unhashable, need to use list
374     results = []
375     unread_testcases = set()
376     finished_unread_testcases = set()
377     manager = StreamQueueManager()
378     manager.start()
379     tests_running = 0
380     free_cpus = list(available_cpus)
381
382     def on_suite_start(tc):
383         nonlocal tests_running
384         nonlocal free_cpus
385         tests_running = tests_running + 1
386
387     def on_suite_finish(tc):
388         nonlocal tests_running
389         nonlocal free_cpus
390         tests_running = tests_running - 1
391         assert tests_running >= 0
392         free_cpus.extend(tc.get_assigned_cpus())
393
394     def run_suite(suite):
395         nonlocal manager
396         nonlocal wrapped_testcase_suites
397         nonlocal unread_testcases
398         nonlocal free_cpus
399         suite.assign_cpus(free_cpus[: suite.cpus_used])
400         free_cpus = free_cpus[suite.cpus_used :]
401         wrapper = TestCaseWrapper(suite, manager)
402         wrapped_testcase_suites.add(wrapper)
403         unread_testcases.add(wrapper)
404         on_suite_start(suite)
405
406     def can_run_suite(suite):
407         return tests_running < max_concurrent_tests and (
408             suite.cpus_used <= len(free_cpus) or suite.cpus_used > max_vpp_cpus
409         )
410
411     while free_cpus and testcase_suites:
412         a_suite = testcase_suites[0]
413         if a_suite.is_tagged_run_solo:
414             a_suite = testcase_suites.pop(0)
415             solo_testcase_suites.append(a_suite)
416             continue
417         if can_run_suite(a_suite):
418             a_suite = testcase_suites.pop(0)
419             run_suite(a_suite)
420         else:
421             break
422
423     if tests_running == 0 and solo_testcase_suites:
424         a_suite = solo_testcase_suites.pop(0)
425         run_suite(a_suite)
426
427     read_from_testcases = threading.Event()
428     read_from_testcases.set()
429     stdouterr_thread = threading.Thread(
430         target=stdouterr_reader_wrapper,
431         args=(unread_testcases, finished_unread_testcases, read_from_testcases),
432     )
433     stdouterr_thread.start()
434
435     failed_wrapped_testcases = set()
436     stop_run = False
437
438     try:
439         while wrapped_testcase_suites or testcase_suites:
440             finished_testcase_suites = set()
441             for wrapped_testcase_suite in wrapped_testcase_suites:
442                 while wrapped_testcase_suite.result_parent_end.poll():
443                     wrapped_testcase_suite.result.process_result(
444                         *wrapped_testcase_suite.result_parent_end.recv()
445                     )
446                     wrapped_testcase_suite.last_heard = time.time()
447
448                 while wrapped_testcase_suite.keep_alive_parent_end.poll():
449                     (
450                         wrapped_testcase_suite.last_test,
451                         wrapped_testcase_suite.last_test_vpp_binary,
452                         wrapped_testcase_suite.last_test_temp_dir,
453                         wrapped_testcase_suite.vpp_pid,
454                     ) = wrapped_testcase_suite.keep_alive_parent_end.recv()
455                     wrapped_testcase_suite.last_heard = time.time()
456
457                 if wrapped_testcase_suite.finished_parent_end.poll():
458                     wrapped_testcase_suite.finished_parent_end.recv()
459                     wrapped_testcase_suite.last_heard = time.time()
460                     stop_run = (
461                         process_finished_testsuite(
462                             wrapped_testcase_suite,
463                             finished_testcase_suites,
464                             failed_wrapped_testcases,
465                             results,
466                         )
467                         or stop_run
468                     )
469                     continue
470
471                 fail = False
472                 if wrapped_testcase_suite.last_heard + config.timeout < time.time():
473                     fail = True
474                     wrapped_testcase_suite.logger.critical(
475                         "Child test runner process timed out "
476                         "(last test running was `%s' in `%s')!"
477                         % (
478                             wrapped_testcase_suite.last_test,
479                             wrapped_testcase_suite.last_test_temp_dir,
480                         )
481                     )
482                 elif not wrapped_testcase_suite.child.is_alive():
483                     fail = True
484                     wrapped_testcase_suite.logger.critical(
485                         "Child test runner process unexpectedly died "
486                         "(last test running was `%s' in `%s')!"
487                         % (
488                             wrapped_testcase_suite.last_test,
489                             wrapped_testcase_suite.last_test_temp_dir,
490                         )
491                     )
492                 elif (
493                     wrapped_testcase_suite.last_test_temp_dir
494                     and wrapped_testcase_suite.last_test_vpp_binary
495                 ):
496                     if is_core_present(wrapped_testcase_suite.last_test_temp_dir):
497                         wrapped_testcase_suite.add_testclass_with_core()
498                         if wrapped_testcase_suite.core_detected_at is None:
499                             wrapped_testcase_suite.core_detected_at = time.time()
500                         elif (
501                             wrapped_testcase_suite.core_detected_at + core_timeout
502                             < time.time()
503                         ):
504                             wrapped_testcase_suite.logger.critical(
505                                 "Child test runner process unresponsive and "
506                                 "core-file exists in test temporary directory "
507                                 "(last test running was `%s' in `%s')!"
508                                 % (
509                                     wrapped_testcase_suite.last_test,
510                                     wrapped_testcase_suite.last_test_temp_dir,
511                                 )
512                             )
513                             fail = True
514
515                 if fail:
516                     wrapped_testcase_suite.child.terminate()
517                     try:
518                         # terminating the child process tends to leave orphan
519                         # VPP process around
520                         if wrapped_testcase_suite.vpp_pid:
521                             os.kill(wrapped_testcase_suite.vpp_pid, signal.SIGTERM)
522                     except OSError:
523                         # already dead
524                         pass
525                     wrapped_testcase_suite.result.crashed = True
526                     wrapped_testcase_suite.result.process_result(
527                         wrapped_testcase_suite.last_test_id, TestResultCode.ERROR
528                     )
529                     stop_run = (
530                         process_finished_testsuite(
531                             wrapped_testcase_suite,
532                             finished_testcase_suites,
533                             failed_wrapped_testcases,
534                             results,
535                         )
536                         or stop_run
537                     )
538
539             for finished_testcase in finished_testcase_suites:
540                 # Somewhat surprisingly, the join below may
541                 # timeout, even if client signaled that
542                 # it finished - so we note it just in case.
543                 join_start = time.time()
544                 finished_testcase.child.join(test_finished_join_timeout)
545                 join_end = time.time()
546                 if join_end - join_start >= test_finished_join_timeout:
547                     finished_testcase.logger.error(
548                         "Timeout joining finished test: %s (pid %d)"
549                         % (finished_testcase.last_test, finished_testcase.child.pid)
550                     )
551                 finished_testcase.close_pipes()
552                 wrapped_testcase_suites.remove(finished_testcase)
553                 finished_unread_testcases.add(finished_testcase)
554                 finished_testcase.stdouterr_queue.put(None)
555                 on_suite_finish(finished_testcase)
556                 if stop_run:
557                     while testcase_suites:
558                         results.append(TestResult(testcase_suites.pop(0)))
559                 elif testcase_suites:
560                     a_suite = testcase_suites[0]
561                     while a_suite and a_suite.is_tagged_run_solo:
562                         testcase_suites.pop(0)
563                         solo_testcase_suites.append(a_suite)
564                         if testcase_suites:
565                             a_suite = testcase_suites[0]
566                         else:
567                             a_suite = None
568                     if a_suite and can_run_suite(a_suite):
569                         testcase_suites.pop(0)
570                         run_suite(a_suite)
571                 if solo_testcase_suites and tests_running == 0:
572                     a_suite = solo_testcase_suites.pop(0)
573                     run_suite(a_suite)
574             time.sleep(0.1)
575     except Exception:
576         for wrapped_testcase_suite in wrapped_testcase_suites:
577             wrapped_testcase_suite.child.terminate()
578             wrapped_testcase_suite.stdouterr_queue.put(None)
579         raise
580     finally:
581         read_from_testcases.clear()
582         stdouterr_thread.join(config.timeout)
583         manager.shutdown()
584
585     handle_cores(failed_wrapped_testcases)
586     return results
587
588
589 class TestSuiteWrapper(unittest.TestSuite):
590     cpus_used = 0
591
592     def __init__(self):
593         return super().__init__()
594
595     def addTest(self, test):
596         self.cpus_used = max(self.cpus_used, test.get_cpus_required())
597         super().addTest(test)
598
599     def assign_cpus(self, cpus):
600         self.cpus = cpus
601
602     def _handleClassSetUp(self, test, result):
603         if not test.__class__.skipped_due_to_cpu_lack:
604             test.assign_cpus(self.cpus)
605         super()._handleClassSetUp(test, result)
606
607     def get_assigned_cpus(self):
608         return self.cpus
609
610
611 class SplitToSuitesCallback:
612     def __init__(self, filter_callback):
613         self.suites = {}
614         self.suite_name = "default"
615         self.filter_callback = filter_callback
616         self.filtered = TestSuiteWrapper()
617
618     def __call__(self, file_name, cls, method):
619         test_method = cls(method)
620         if self.filter_callback(file_name, cls.__name__, method):
621             self.suite_name = file_name + cls.__name__
622             if self.suite_name not in self.suites:
623                 self.suites[self.suite_name] = TestSuiteWrapper()
624                 self.suites[self.suite_name].is_tagged_run_solo = False
625             self.suites[self.suite_name].addTest(test_method)
626             if test_method.is_tagged_run_solo():
627                 self.suites[self.suite_name].is_tagged_run_solo = True
628
629         else:
630             self.filtered.addTest(test_method)
631
632
633 def parse_test_filter(test_filter):
634     f = test_filter
635     filter_file_name = None
636     filter_class_name = None
637     filter_func_name = None
638     if f:
639         if "." in f:
640             parts = f.split(".")
641             if len(parts) > 3:
642                 raise Exception(f"Invalid test filter: {test_filter}")
643             if len(parts) > 2:
644                 if parts[2] not in ("*", ""):
645                     filter_func_name = parts[2]
646             if parts[1] not in ("*", ""):
647                 filter_class_name = parts[1]
648             if parts[0] not in ("*", ""):
649                 if parts[0].startswith("test_"):
650                     filter_file_name = parts[0]
651                 else:
652                     filter_file_name = "test_%s" % parts[0]
653         else:
654             if f.startswith("test_"):
655                 filter_file_name = f
656             else:
657                 filter_file_name = "test_%s" % f
658     if filter_file_name:
659         filter_file_name = "%s.py" % filter_file_name
660     return filter_file_name, filter_class_name, filter_func_name
661
662
663 def filter_tests(tests, filter_cb):
664     result = TestSuiteWrapper()
665     for t in tests:
666         if isinstance(t, unittest.suite.TestSuite):
667             # this is a bunch of tests, recursively filter...
668             x = filter_tests(t, filter_cb)
669             if x.countTestCases() > 0:
670                 result.addTest(x)
671         elif isinstance(t, unittest.TestCase):
672             # this is a single test
673             parts = t.id().split(".")
674             # t.id() for common cases like this:
675             # test_classifier.TestClassifier.test_acl_ip
676             # apply filtering only if it is so
677             if len(parts) == 3:
678                 if not filter_cb(parts[0], parts[1], parts[2]):
679                     continue
680             result.addTest(t)
681         else:
682             # unexpected object, don't touch it
683             result.addTest(t)
684     return result
685
686
687 class FilterByTestOption:
688     def __init__(self, filters):
689         self.filters = filters
690
691     def __call__(self, file_name, class_name, func_name):
692         def test_one(
693             filter_file_name,
694             filter_class_name,
695             filter_func_name,
696             file_name,
697             class_name,
698             func_name,
699         ):
700             if filter_file_name:
701                 fn_match = fnmatch.fnmatch(file_name, filter_file_name)
702                 if not fn_match:
703                     return False
704             if filter_class_name and class_name != filter_class_name:
705                 return False
706             if filter_func_name and func_name != filter_func_name:
707                 return False
708             return True
709
710         for filter_file_name, filter_class_name, filter_func_name in self.filters:
711             if test_one(
712                 filter_file_name,
713                 filter_class_name,
714                 filter_func_name,
715                 file_name,
716                 class_name,
717                 func_name,
718             ):
719                 return True
720
721         return False
722
723
724 class FilterByClassList:
725     def __init__(self, classes_with_filenames):
726         self.classes_with_filenames = classes_with_filenames
727
728     def __call__(self, file_name, class_name, func_name):
729         return ".".join([file_name, class_name]) in self.classes_with_filenames
730
731
732 def suite_from_failed(suite, failed):
733     failed = {x.rsplit(".", 1)[0] for x in failed}
734     filter_cb = FilterByClassList(failed)
735     suite = filter_tests(suite, filter_cb)
736     return suite
737
738
739 class AllResults(dict):
740     def __init__(self):
741         super(AllResults, self).__init__()
742         self.all_testcases = 0
743         self.results_per_suite = []
744         for trc in list(TestResultCode):
745             self[trc] = 0
746         self.rerun = []
747         self.testsuites_no_tests_run = []
748
749     def add_results(self, result):
750         self.results_per_suite.append(result)
751         for trc in list(TestResultCode):
752             self[trc] += len(result[trc])
753
754     def add_result(self, result):
755         retval = 0
756         self.all_testcases += result.testcase_suite.countTestCases()
757         self.add_results(result)
758
759         if result.no_tests_run():
760             self.testsuites_no_tests_run.append(result.testcase_suite)
761             if result.crashed:
762                 retval = -1
763             else:
764                 retval = 1
765         elif not result.was_successful():
766             retval = 1
767
768         if retval != 0:
769             self.rerun.append(result.testcase_suite)
770
771         return retval
772
773     def print_results(self):
774         print("")
775         print(double_line_delim)
776         print("TEST RESULTS:")
777
778         def indent_results(lines):
779             lines = list(filter(None, lines))
780             maximum = max(lines, key=lambda x: x.index(":"))
781             maximum = 4 + maximum.index(":")
782             for l in lines:
783                 padding = " " * (maximum - l.index(":"))
784                 print(f"{padding}{l}")
785
786         indent_results(
787             [
788                 f"Scheduled tests: {self.all_testcases}",
789                 f"Executed tests: {self[TestResultCode.TEST_RUN]}",
790                 f"Passed tests: {colorize(self[TestResultCode.PASS], GREEN)}",
791                 (
792                     f"Expected failures: {colorize(self[TestResultCode.EXPECTED_FAIL], GREEN)}"
793                     if self[TestResultCode.EXPECTED_FAIL]
794                     else None
795                 ),
796                 (
797                     f"Skipped tests: {colorize(self[TestResultCode.SKIP], YELLOW)}"
798                     if self[TestResultCode.SKIP]
799                     else None
800                 ),
801                 (
802                     f"Not Executed tests: {colorize(self.not_executed, RED)}"
803                     if self.not_executed
804                     else None
805                 ),
806                 (
807                     f"Failures: {colorize(self[TestResultCode.FAIL], RED)}"
808                     if self[TestResultCode.FAIL]
809                     else None
810                 ),
811                 (
812                     f"Unexpected passes: {colorize(self[TestResultCode.UNEXPECTED_PASS], RED)}"
813                     if self[TestResultCode.UNEXPECTED_PASS]
814                     else None
815                 ),
816                 (
817                     f"Errors: {colorize(self[TestResultCode.ERROR], RED)}"
818                     if self[TestResultCode.ERROR]
819                     else None
820                 ),
821                 (
822                     "Tests skipped due to lack of CPUS: "
823                     f"{colorize(self[TestResultCode.SKIP_CPU_SHORTAGE], YELLOW)}"
824                     if self[TestResultCode.SKIP_CPU_SHORTAGE]
825                     else None
826                 ),
827             ]
828         )
829
830         if self.all_failed > 0:
831             print("FAILURES AND ERRORS IN TESTS:")
832             for result in self.results_per_suite:
833                 old_testcase_name = None
834                 for tr_code, headline in (
835                     (TestResultCode.FAIL, "FAILURE"),
836                     (TestResultCode.ERROR, "ERROR"),
837                     (TestResultCode.UNEXPECTED_PASS, "UNEXPECTED PASS"),
838                 ):
839                     if not result[tr_code]:
840                         continue
841
842                     for failed_test_id in result[tr_code]:
843                         new_testcase_name, test_name = result.get_testcase_names(
844                             failed_test_id
845                         )
846                         if new_testcase_name != old_testcase_name:
847                             print(
848                                 f"  Testcase name: {colorize(new_testcase_name, RED)}"
849                             )
850                             old_testcase_name = new_testcase_name
851                         print(
852                             f"    {headline}: {colorize(test_name, RED)} [{failed_test_id}]"
853                         )
854
855         if self.testsuites_no_tests_run:
856             print("TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:")
857             tc_classes = set()
858             for testsuite in self.testsuites_no_tests_run:
859                 for testcase in testsuite:
860                     tc_classes.add(get_testcase_doc_name(testcase))
861             for tc_class in tc_classes:
862                 print("  {}".format(colorize(tc_class, RED)))
863
864         if self[TestResultCode.SKIP_CPU_SHORTAGE]:
865             print()
866             print(
867                 colorize(
868                     "     SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT"
869                     " ENOUGH CPUS AVAILABLE",
870                     YELLOW,
871                 )
872             )
873         print(double_line_delim)
874         print("")
875
876     @property
877     def not_executed(self):
878         return self.all_testcases - self[TestResultCode.TEST_RUN]
879
880     @property
881     def all_failed(self):
882         return (
883             self[TestResultCode.FAIL]
884             + self[TestResultCode.ERROR]
885             + self[TestResultCode.UNEXPECTED_PASS]
886         )
887
888
889 def parse_results(results):
890     """
891     Prints the number of scheduled, executed, not executed, passed, failed,
892     errored and skipped tests and details about failed and errored tests.
893
894     Also returns all suites where any test failed.
895
896     :param results:
897     :return:
898     """
899
900     results_per_suite = AllResults()
901     crashed = False
902     failed = False
903     for result in results:
904         result_code = results_per_suite.add_result(result)
905         if result_code == 1:
906             failed = True
907         elif result_code == -1:
908             crashed = True
909
910     results_per_suite.print_results()
911
912     if crashed:
913         return_code = -1
914     elif failed:
915         return_code = 1
916     else:
917         return_code = 0
918     return return_code, results_per_suite.rerun
919
920
921 if __name__ == "__main__":
922     print(f"Config is: {config}")
923
924     if config.api_preload:
925         VPPApiJSONFiles.load_api(apidir=config.extern_apidir + [config.vpp_install_dir])
926
927     if config.sanity:
928         print("Running sanity test case.")
929         try:
930             rc = sanity_run_vpp.main()
931             if rc != 0:
932                 sys.exit(rc)
933         except Exception as e:
934             print(traceback.format_exc())
935             print("Couldn't run sanity test case.")
936             sys.exit(-1)
937
938     test_finished_join_timeout = 15
939
940     debug_gdb = config.debug in ["gdb", "gdbserver", "attach"]
941     debug_core = config.debug == "core"
942
943     run_interactive = debug_gdb or config.step or config.force_foreground
944
945     max_concurrent_tests = 0
946     print(f"OS reports {num_cpus} available cpu(s).")
947
948     test_jobs = config.jobs
949     if test_jobs == "auto":
950         if run_interactive:
951             max_concurrent_tests = 1
952             print("Interactive mode required, running tests consecutively.")
953         else:
954             max_concurrent_tests = num_cpus
955             print(
956                 f"Running at most {max_concurrent_tests} python test "
957                 "processes concurrently."
958             )
959     else:
960         max_concurrent_tests = test_jobs
961         print(
962             f"Running at most {max_concurrent_tests} python test processes "
963             "concurrently as set by 'TEST_JOBS'."
964         )
965
966     print(f"Using at most {max_vpp_cpus} cpus for VPP threads.")
967
968     if run_interactive and max_concurrent_tests > 1:
969         raise NotImplementedError(
970             "Running tests interactively (DEBUG is gdb[server] or ATTACH or "
971             "STEP is set) in parallel (TEST_JOBS is more than 1) is not "
972             "supported"
973         )
974
975     descriptions = True
976
977     print("Running tests using custom test runner.")
978     filters = [(parse_test_filter(f)) for f in config.filter.split(",")]
979
980     print(
981         "Selected filters: ",
982         "|".join(
983             f"file={filter_file}, class={filter_class}, function={filter_func}"
984             for filter_file, filter_class, filter_func in filters
985         ),
986     )
987
988     filter_cb = FilterByTestOption(filters)
989
990     cb = SplitToSuitesCallback(filter_cb)
991     for d in config.test_src_dir:
992         print("Adding tests from directory tree %s" % d)
993         discover_tests(d, cb)
994
995     # suites are not hashable, need to use list
996     suites = []
997     tests_amount = 0
998     for testcase_suite in cb.suites.values():
999         tests_amount += testcase_suite.countTestCases()
1000         if testcase_suite.cpus_used > max_vpp_cpus:
1001             # here we replace test functions with lambdas to just skip them
1002             # but we also replace setUp/tearDown functions to do nothing
1003             # so that the test can be "started" and "stopped", so that we can
1004             # still keep those prints (test description - SKIP), which are done
1005             # in stopTest() (for that to trigger, test function must run)
1006             for t in testcase_suite:
1007                 for m in dir(t):
1008                     if m.startswith("test_"):
1009                         setattr(t, m, lambda: t.skipTest("not enough cpus"))
1010                 setattr(t.__class__, "setUpClass", lambda: None)
1011                 setattr(t.__class__, "tearDownClass", lambda: None)
1012                 setattr(t, "setUp", lambda: None)
1013                 setattr(t, "tearDown", lambda: None)
1014                 t.__class__.skipped_due_to_cpu_lack = True
1015         suites.append(testcase_suite)
1016
1017     print(
1018         "%s out of %s tests match specified filters"
1019         % (tests_amount, tests_amount + cb.filtered.countTestCases())
1020     )
1021
1022     if not config.extended:
1023         print("Not running extended tests (some tests will be skipped)")
1024
1025     attempts = config.retries + 1
1026     if attempts > 1:
1027         print("Perform %s attempts to pass the suite..." % attempts)
1028
1029     if run_interactive and suites:
1030         # don't fork if requiring interactive terminal
1031         print("Running tests in foreground in the current process")
1032         full_suite = unittest.TestSuite()
1033         free_cpus = list(available_cpus)
1034         cpu_shortage = False
1035         for suite in suites:
1036             if suite.cpus_used <= max_vpp_cpus:
1037                 suite.assign_cpus(free_cpus[: suite.cpus_used])
1038             else:
1039                 suite.assign_cpus([])
1040                 cpu_shortage = True
1041         full_suite.addTests(suites)
1042         result = VppTestRunner(
1043             verbosity=config.verbose, failfast=config.failfast, print_summary=True
1044         ).run(full_suite)
1045         was_successful = result.wasSuccessful()
1046         if not was_successful:
1047             for test_case_info in result.failed_test_cases_info:
1048                 handle_failed_suite(
1049                     test_case_info.logger,
1050                     test_case_info.tempdir,
1051                     test_case_info.vpp_pid,
1052                     config.vpp,
1053                 )
1054                 if test_case_info in result.core_crash_test_cases_info:
1055                     check_and_handle_core(
1056                         test_case_info.vpp_bin_path,
1057                         test_case_info.tempdir,
1058                         test_case_info.core_crash_test,
1059                     )
1060
1061         if cpu_shortage:
1062             print()
1063             print(
1064                 colorize(
1065                     "SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT"
1066                     " ENOUGH CPUS AVAILABLE",
1067                     YELLOW,
1068                 )
1069             )
1070             print()
1071         sys.exit(not was_successful)
1072     else:
1073         print(
1074             "Running each VPPTestCase in a separate background process"
1075             f" with at most {max_concurrent_tests} parallel python test "
1076             "process(es)"
1077         )
1078         exit_code = 0
1079         while suites and attempts > 0:
1080             for suite in suites:
1081                 failed_link = get_failed_testcase_linkname(
1082                     config.failed_dir,
1083                     f"{get_testcase_dirname(suite._tests[0].__class__.__name__)}",
1084                 )
1085                 if os.path.islink(failed_link):
1086                     os.unlink(failed_link)
1087             results = run_forked(suites)
1088             exit_code, suites = parse_results(results)
1089             attempts -= 1
1090             if exit_code == 0:
1091                 print("Test run was successful")
1092             else:
1093                 print("%s attempt(s) left." % attempts)
1094         sys.exit(exit_code)