ipsec: allow receiving encrypted IP packets with TFC padding
[vpp.git] / test / run_tests.py
1 #!/usr/bin/env python3
2
3 import sys
4 import shutil
5 import os
6 import fnmatch
7 import unittest
8 import time
9 import threading
10 import traceback
11 import signal
12 import re
13 from multiprocessing import Process, Pipe, get_context
14 from multiprocessing.queues import Queue
15 from multiprocessing.managers import BaseManager
16 from config import config, num_cpus, available_cpus, max_vpp_cpus
17 from asfframework import (
18     VppTestRunner,
19     get_testcase_doc_name,
20     get_test_description,
21     get_failed_testcase_linkname,
22     get_testcase_dirname,
23 )
24 from framework import VppTestCase
25 from test_result_code import TestResultCode
26 from debug import spawn_gdb
27 from log import (
28     get_parallel_logger,
29     double_line_delim,
30     RED,
31     YELLOW,
32     GREEN,
33     colorize,
34     single_line_delim,
35 )
36 from discover_tests import discover_tests
37 import sanity_run_vpp
38 from subprocess import check_output, CalledProcessError
39 from util import check_core_path, get_core_path, is_core_present
40
41 # timeout which controls how long the child has to finish after seeing
42 # a core dump in test temporary directory. If this is exceeded, parent assumes
43 # that child process is stuck (e.g. waiting for event from vpp) and kill
44 # the child
45 core_timeout = 3
46
47
48 class StreamQueue(Queue):
49     def write(self, msg):
50         self.put(msg)
51
52     def flush(self):
53         sys.__stdout__.flush()
54         sys.__stderr__.flush()
55
56     def fileno(self):
57         return self._writer.fileno()
58
59
60 class StreamQueueManager(BaseManager):
61     pass
62
63
64 StreamQueueManager.register("StreamQueue", StreamQueue)
65
66
67 class TestResult(dict):
68     def __init__(self, testcase_suite, testcases_by_id=None):
69         super(TestResult, self).__init__()
70         for trc in list(TestResultCode):
71             self[trc] = []
72         self.crashed = False
73         self.testcase_suite = testcase_suite
74         self.testcases = [testcase for testcase in testcase_suite]
75         self.testcases_by_id = testcases_by_id
76
77     def was_successful(self):
78         return (
79             0
80             == len(self[TestResultCode.FAIL])
81             == len(self[TestResultCode.ERROR])
82             == len(self[TestResultCode.UNEXPECTED_PASS])
83             and len(self[TestResultCode.PASS])
84             + len(self[TestResultCode.SKIP])
85             + len(self[TestResultCode.SKIP_CPU_SHORTAGE])
86             + len(self[TestResultCode.EXPECTED_FAIL])
87             == self.testcase_suite.countTestCases()
88         )
89
90     def no_tests_run(self):
91         return 0 == len(self[TestResultCode.TEST_RUN])
92
93     def process_result(self, test_id, result):
94         self[result].append(test_id)
95
96     def suite_from_failed(self):
97         rerun_ids = set([])
98         for testcase in self.testcase_suite:
99             tc_id = testcase.id()
100             if (
101                 tc_id
102                 not in self[TestResultCode.PASS]
103                 + self[TestResultCode.SKIP]
104                 + self[TestResultCode.SKIP_CPU_SHORTAGE]
105                 + self[TestResultCode.EXPECTED_FAIL]
106             ):
107                 rerun_ids.add(tc_id)
108         if rerun_ids:
109             return suite_from_failed(self.testcase_suite, rerun_ids)
110
111     def get_testcase_names(self, test_id):
112         # could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
113         setup_teardown_match = re.match(
114             r"((tearDownClass)|(setUpClass)) \((.+\..+)\)", test_id
115         )
116         if setup_teardown_match:
117             test_name, _, _, testcase_name = setup_teardown_match.groups()
118             if len(testcase_name.split(".")) == 2:
119                 for key in self.testcases_by_id.keys():
120                     if key.startswith(testcase_name):
121                         testcase_name = key
122                         break
123             testcase_name = self._get_testcase_doc_name(testcase_name)
124         else:
125             test_name = self._get_test_description(test_id)
126             testcase_name = self._get_testcase_doc_name(test_id)
127
128         return testcase_name, test_name
129
130     def _get_test_description(self, test_id):
131         if test_id in self.testcases_by_id:
132             desc = get_test_description(descriptions, self.testcases_by_id[test_id])
133         else:
134             desc = test_id
135         return desc
136
137     def _get_testcase_doc_name(self, test_id):
138         if test_id in self.testcases_by_id:
139             doc_name = get_testcase_doc_name(self.testcases_by_id[test_id])
140         else:
141             doc_name = test_id
142         return doc_name
143
144
145 def test_runner_wrapper(
146     suite, keep_alive_pipe, stdouterr_queue, finished_pipe, result_pipe, logger
147 ):
148     sys.stdout = stdouterr_queue
149     sys.stderr = stdouterr_queue
150     VppTestCase.parallel_handler = logger.handlers[0]
151     result = VppTestRunner(
152         keep_alive_pipe=keep_alive_pipe,
153         descriptions=descriptions,
154         verbosity=config.verbose,
155         result_pipe=result_pipe,
156         failfast=config.failfast,
157         print_summary=False,
158     ).run(suite)
159     finished_pipe.send(result.wasSuccessful())
160     finished_pipe.close()
161     keep_alive_pipe.close()
162
163
164 class TestCaseWrapper(object):
165     def __init__(self, testcase_suite, manager):
166         self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(duplex=False)
167         self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
168         self.result_parent_end, self.result_child_end = Pipe(duplex=False)
169         self.testcase_suite = testcase_suite
170         self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
171         self.logger = get_parallel_logger(self.stdouterr_queue)
172         self.child = Process(
173             target=test_runner_wrapper,
174             args=(
175                 testcase_suite,
176                 self.keep_alive_child_end,
177                 self.stdouterr_queue,
178                 self.finished_child_end,
179                 self.result_child_end,
180                 self.logger,
181             ),
182         )
183         self.child.start()
184         self.last_test_temp_dir = None
185         self.last_test_vpp_binary = None
186         self._last_test = None
187         self.last_test_id = None
188         self.vpp_pid = None
189         self.last_heard = time.time()
190         self.core_detected_at = None
191         self.testcases_by_id = {}
192         self.testclasess_with_core = {}
193         for testcase in self.testcase_suite:
194             self.testcases_by_id[testcase.id()] = testcase
195         self.result = TestResult(testcase_suite, self.testcases_by_id)
196
197     @property
198     def last_test(self):
199         return self._last_test
200
201     @last_test.setter
202     def last_test(self, test_id):
203         self.last_test_id = test_id
204         if test_id in self.testcases_by_id:
205             testcase = self.testcases_by_id[test_id]
206             self._last_test = testcase.shortDescription()
207             if not self._last_test:
208                 self._last_test = str(testcase)
209         else:
210             self._last_test = test_id
211
212     def add_testclass_with_core(self):
213         if self.last_test_id in self.testcases_by_id:
214             test = self.testcases_by_id[self.last_test_id]
215             class_name = unittest.util.strclass(test.__class__)
216             test_name = "'{}' ({})".format(
217                 get_test_description(descriptions, test), self.last_test_id
218             )
219         else:
220             test_name = self.last_test_id
221             class_name = re.match(
222                 r"((tearDownClass)|(setUpClass)) " r"\((.+\..+)\)", test_name
223             ).groups()[3]
224         if class_name not in self.testclasess_with_core:
225             self.testclasess_with_core[class_name] = (
226                 test_name,
227                 self.last_test_vpp_binary,
228                 self.last_test_temp_dir,
229             )
230
231     def close_pipes(self):
232         self.keep_alive_child_end.close()
233         self.finished_child_end.close()
234         self.result_child_end.close()
235         self.keep_alive_parent_end.close()
236         self.finished_parent_end.close()
237         self.result_parent_end.close()
238
239     def was_successful(self):
240         return self.result.was_successful()
241
242     @property
243     def cpus_used(self):
244         return self.testcase_suite.cpus_used
245
246     def get_assigned_cpus(self):
247         return self.testcase_suite.get_assigned_cpus()
248
249
250 def stdouterr_reader_wrapper(
251     unread_testcases, finished_unread_testcases, read_testcases
252 ):
253     read_testcase = None
254     while read_testcases.is_set() or unread_testcases:
255         if finished_unread_testcases:
256             read_testcase = finished_unread_testcases.pop()
257             unread_testcases.remove(read_testcase)
258         elif unread_testcases:
259             read_testcase = unread_testcases.pop()
260         if read_testcase:
261             data = ""
262             while data is not None:
263                 sys.stdout.write(data)
264                 data = read_testcase.stdouterr_queue.get()
265
266             read_testcase.stdouterr_queue.close()
267             finished_unread_testcases.discard(read_testcase)
268             read_testcase = None
269
270
271 def handle_failed_suite(logger, last_test_temp_dir, vpp_pid, vpp_binary):
272     if last_test_temp_dir:
273         # Need to create link in case of a timeout or core dump without failure
274         lttd = os.path.basename(last_test_temp_dir)
275         link_path = os.path.join(config.failed_dir, f"{lttd}-FAILED")
276         if not os.path.exists(link_path):
277             os.symlink(last_test_temp_dir, link_path)
278         logger.error(
279             "Symlink to failed testcase directory: %s -> %s" % (link_path, lttd)
280         )
281
282         # Report core existence
283         core_path = get_core_path(last_test_temp_dir)
284         if os.path.exists(core_path):
285             logger.error(
286                 "Core-file exists in test temporary directory: %s!" % core_path
287             )
288             check_core_path(logger, core_path)
289             logger.debug("Running 'file %s':" % core_path)
290             try:
291                 info = check_output(["file", core_path])
292                 logger.debug(info)
293             except CalledProcessError as e:
294                 logger.error(
295                     "Subprocess returned with return code "
296                     "while running `file' utility on core-file "
297                     "returned: "
298                     "rc=%s",
299                     e.returncode,
300                 )
301             except OSError as e:
302                 logger.error(
303                     "Subprocess returned with OS error while "
304                     "running 'file' utility "
305                     "on core-file: "
306                     "(%s) %s",
307                     e.errno,
308                     e.strerror,
309                 )
310             except Exception as e:
311                 logger.exception("Unexpected error running `file' utility on core-file")
312             logger.error(f"gdb {vpp_binary} {core_path}")
313
314     if vpp_pid:
315         # Copy api post mortem
316         api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
317         if os.path.isfile(api_post_mortem_path):
318             logger.error(
319                 "Copying api_post_mortem.%d to %s" % (vpp_pid, last_test_temp_dir)
320             )
321             shutil.copy2(api_post_mortem_path, last_test_temp_dir)
322
323
324 def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
325     if is_core_present(tempdir):
326         if debug_core:
327             print(
328                 "VPP core detected in %s. Last test running was %s"
329                 % (tempdir, core_crash_test)
330             )
331             print(single_line_delim)
332             spawn_gdb(vpp_binary, get_core_path(tempdir))
333             print(single_line_delim)
334         elif config.compress_core:
335             print("Compressing core-file in test directory `%s'" % tempdir)
336             os.system("gzip %s" % get_core_path(tempdir))
337
338
339 def handle_cores(failed_testcases):
340     for failed_testcase in failed_testcases:
341         tcs_with_core = failed_testcase.testclasess_with_core
342         if tcs_with_core:
343             for test, vpp_binary, tempdir in tcs_with_core.values():
344                 check_and_handle_core(vpp_binary, tempdir, test)
345
346
347 def process_finished_testsuite(
348     wrapped_testcase_suite, finished_testcase_suites, failed_wrapped_testcases, results
349 ):
350     results.append(wrapped_testcase_suite.result)
351     finished_testcase_suites.add(wrapped_testcase_suite)
352     stop_run = False
353     if config.failfast and not wrapped_testcase_suite.was_successful():
354         stop_run = True
355
356     if not wrapped_testcase_suite.was_successful():
357         failed_wrapped_testcases.add(wrapped_testcase_suite)
358         handle_failed_suite(
359             wrapped_testcase_suite.logger,
360             wrapped_testcase_suite.last_test_temp_dir,
361             wrapped_testcase_suite.vpp_pid,
362             wrapped_testcase_suite.last_test_vpp_binary,
363         )
364
365     return stop_run
366
367
368 def run_forked(testcase_suites):
369     wrapped_testcase_suites = set()
370     solo_testcase_suites = []
371
372     # suites are unhashable, need to use list
373     results = []
374     unread_testcases = set()
375     finished_unread_testcases = set()
376     manager = StreamQueueManager()
377     manager.start()
378     tests_running = 0
379     free_cpus = list(available_cpus)
380
381     def on_suite_start(tc):
382         nonlocal tests_running
383         nonlocal free_cpus
384         tests_running = tests_running + 1
385
386     def on_suite_finish(tc):
387         nonlocal tests_running
388         nonlocal free_cpus
389         tests_running = tests_running - 1
390         assert tests_running >= 0
391         free_cpus.extend(tc.get_assigned_cpus())
392
393     def run_suite(suite):
394         nonlocal manager
395         nonlocal wrapped_testcase_suites
396         nonlocal unread_testcases
397         nonlocal free_cpus
398         suite.assign_cpus(free_cpus[: suite.cpus_used])
399         free_cpus = free_cpus[suite.cpus_used :]
400         wrapper = TestCaseWrapper(suite, manager)
401         wrapped_testcase_suites.add(wrapper)
402         unread_testcases.add(wrapper)
403         on_suite_start(suite)
404
405     def can_run_suite(suite):
406         return tests_running < max_concurrent_tests and (
407             suite.cpus_used <= len(free_cpus) or suite.cpus_used > max_vpp_cpus
408         )
409
410     while free_cpus and testcase_suites:
411         a_suite = testcase_suites[0]
412         if a_suite.is_tagged_run_solo:
413             a_suite = testcase_suites.pop(0)
414             solo_testcase_suites.append(a_suite)
415             continue
416         if can_run_suite(a_suite):
417             a_suite = testcase_suites.pop(0)
418             run_suite(a_suite)
419         else:
420             break
421
422     if tests_running == 0 and solo_testcase_suites:
423         a_suite = solo_testcase_suites.pop(0)
424         run_suite(a_suite)
425
426     read_from_testcases = threading.Event()
427     read_from_testcases.set()
428     stdouterr_thread = threading.Thread(
429         target=stdouterr_reader_wrapper,
430         args=(unread_testcases, finished_unread_testcases, read_from_testcases),
431     )
432     stdouterr_thread.start()
433
434     failed_wrapped_testcases = set()
435     stop_run = False
436
437     try:
438         while wrapped_testcase_suites or testcase_suites:
439             finished_testcase_suites = set()
440             for wrapped_testcase_suite in wrapped_testcase_suites:
441                 while wrapped_testcase_suite.result_parent_end.poll():
442                     wrapped_testcase_suite.result.process_result(
443                         *wrapped_testcase_suite.result_parent_end.recv()
444                     )
445                     wrapped_testcase_suite.last_heard = time.time()
446
447                 while wrapped_testcase_suite.keep_alive_parent_end.poll():
448                     (
449                         wrapped_testcase_suite.last_test,
450                         wrapped_testcase_suite.last_test_vpp_binary,
451                         wrapped_testcase_suite.last_test_temp_dir,
452                         wrapped_testcase_suite.vpp_pid,
453                     ) = wrapped_testcase_suite.keep_alive_parent_end.recv()
454                     wrapped_testcase_suite.last_heard = time.time()
455
456                 if wrapped_testcase_suite.finished_parent_end.poll():
457                     wrapped_testcase_suite.finished_parent_end.recv()
458                     wrapped_testcase_suite.last_heard = time.time()
459                     stop_run = (
460                         process_finished_testsuite(
461                             wrapped_testcase_suite,
462                             finished_testcase_suites,
463                             failed_wrapped_testcases,
464                             results,
465                         )
466                         or stop_run
467                     )
468                     continue
469
470                 fail = False
471                 if wrapped_testcase_suite.last_heard + config.timeout < time.time():
472                     fail = True
473                     wrapped_testcase_suite.logger.critical(
474                         "Child test runner process timed out "
475                         "(last test running was `%s' in `%s')!"
476                         % (
477                             wrapped_testcase_suite.last_test,
478                             wrapped_testcase_suite.last_test_temp_dir,
479                         )
480                     )
481                 elif not wrapped_testcase_suite.child.is_alive():
482                     fail = True
483                     wrapped_testcase_suite.logger.critical(
484                         "Child test runner process unexpectedly died "
485                         "(last test running was `%s' in `%s')!"
486                         % (
487                             wrapped_testcase_suite.last_test,
488                             wrapped_testcase_suite.last_test_temp_dir,
489                         )
490                     )
491                 elif (
492                     wrapped_testcase_suite.last_test_temp_dir
493                     and wrapped_testcase_suite.last_test_vpp_binary
494                 ):
495                     if is_core_present(wrapped_testcase_suite.last_test_temp_dir):
496                         wrapped_testcase_suite.add_testclass_with_core()
497                         if wrapped_testcase_suite.core_detected_at is None:
498                             wrapped_testcase_suite.core_detected_at = time.time()
499                         elif (
500                             wrapped_testcase_suite.core_detected_at + core_timeout
501                             < time.time()
502                         ):
503                             wrapped_testcase_suite.logger.critical(
504                                 "Child test runner process unresponsive and "
505                                 "core-file exists in test temporary directory "
506                                 "(last test running was `%s' in `%s')!"
507                                 % (
508                                     wrapped_testcase_suite.last_test,
509                                     wrapped_testcase_suite.last_test_temp_dir,
510                                 )
511                             )
512                             fail = True
513
514                 if fail:
515                     wrapped_testcase_suite.child.terminate()
516                     try:
517                         # terminating the child process tends to leave orphan
518                         # VPP process around
519                         if wrapped_testcase_suite.vpp_pid:
520                             os.kill(wrapped_testcase_suite.vpp_pid, signal.SIGTERM)
521                     except OSError:
522                         # already dead
523                         pass
524                     wrapped_testcase_suite.result.crashed = True
525                     wrapped_testcase_suite.result.process_result(
526                         wrapped_testcase_suite.last_test_id, TestResultCode.ERROR
527                     )
528                     stop_run = (
529                         process_finished_testsuite(
530                             wrapped_testcase_suite,
531                             finished_testcase_suites,
532                             failed_wrapped_testcases,
533                             results,
534                         )
535                         or stop_run
536                     )
537
538             for finished_testcase in finished_testcase_suites:
539                 # Somewhat surprisingly, the join below may
540                 # timeout, even if client signaled that
541                 # it finished - so we note it just in case.
542                 join_start = time.time()
543                 finished_testcase.child.join(test_finished_join_timeout)
544                 join_end = time.time()
545                 if join_end - join_start >= test_finished_join_timeout:
546                     finished_testcase.logger.error(
547                         "Timeout joining finished test: %s (pid %d)"
548                         % (finished_testcase.last_test, finished_testcase.child.pid)
549                     )
550                 finished_testcase.close_pipes()
551                 wrapped_testcase_suites.remove(finished_testcase)
552                 finished_unread_testcases.add(finished_testcase)
553                 finished_testcase.stdouterr_queue.put(None)
554                 on_suite_finish(finished_testcase)
555                 if stop_run:
556                     while testcase_suites:
557                         results.append(TestResult(testcase_suites.pop(0)))
558                 elif testcase_suites:
559                     a_suite = testcase_suites[0]
560                     while a_suite and a_suite.is_tagged_run_solo:
561                         testcase_suites.pop(0)
562                         solo_testcase_suites.append(a_suite)
563                         if testcase_suites:
564                             a_suite = testcase_suites[0]
565                         else:
566                             a_suite = None
567                     if a_suite and can_run_suite(a_suite):
568                         testcase_suites.pop(0)
569                         run_suite(a_suite)
570                 if solo_testcase_suites and tests_running == 0:
571                     a_suite = solo_testcase_suites.pop(0)
572                     run_suite(a_suite)
573             time.sleep(0.1)
574     except Exception:
575         for wrapped_testcase_suite in wrapped_testcase_suites:
576             wrapped_testcase_suite.child.terminate()
577             wrapped_testcase_suite.stdouterr_queue.put(None)
578         raise
579     finally:
580         read_from_testcases.clear()
581         stdouterr_thread.join(config.timeout)
582         manager.shutdown()
583
584     handle_cores(failed_wrapped_testcases)
585     return results
586
587
588 class TestSuiteWrapper(unittest.TestSuite):
589     cpus_used = 0
590
591     def __init__(self):
592         return super().__init__()
593
594     def addTest(self, test):
595         self.cpus_used = max(self.cpus_used, test.get_cpus_required())
596         super().addTest(test)
597
598     def assign_cpus(self, cpus):
599         self.cpus = cpus
600
601     def _handleClassSetUp(self, test, result):
602         if not test.__class__.skipped_due_to_cpu_lack:
603             test.assign_cpus(self.cpus)
604         super()._handleClassSetUp(test, result)
605
606     def get_assigned_cpus(self):
607         return self.cpus
608
609
610 class SplitToSuitesCallback:
611     def __init__(self, filter_callback):
612         self.suites = {}
613         self.suite_name = "default"
614         self.filter_callback = filter_callback
615         self.filtered = TestSuiteWrapper()
616
617     def __call__(self, file_name, cls, method):
618         test_method = cls(method)
619         if self.filter_callback(file_name, cls.__name__, method):
620             self.suite_name = file_name + cls.__name__
621             if self.suite_name not in self.suites:
622                 self.suites[self.suite_name] = TestSuiteWrapper()
623                 self.suites[self.suite_name].is_tagged_run_solo = False
624             self.suites[self.suite_name].addTest(test_method)
625             if test_method.is_tagged_run_solo():
626                 self.suites[self.suite_name].is_tagged_run_solo = True
627
628         else:
629             self.filtered.addTest(test_method)
630
631
632 def parse_test_filter(test_filter):
633     f = test_filter
634     filter_file_name = None
635     filter_class_name = None
636     filter_func_name = None
637     if f:
638         if "." in f:
639             parts = f.split(".")
640             if len(parts) > 3:
641                 raise Exception(f"Invalid test filter: {test_filter}")
642             if len(parts) > 2:
643                 if parts[2] not in ("*", ""):
644                     filter_func_name = parts[2]
645             if parts[1] not in ("*", ""):
646                 filter_class_name = parts[1]
647             if parts[0] not in ("*", ""):
648                 if parts[0].startswith("test_"):
649                     filter_file_name = parts[0]
650                 else:
651                     filter_file_name = "test_%s" % parts[0]
652         else:
653             if f.startswith("test_"):
654                 filter_file_name = f
655             else:
656                 filter_file_name = "test_%s" % f
657     if filter_file_name:
658         filter_file_name = "%s.py" % filter_file_name
659     return filter_file_name, filter_class_name, filter_func_name
660
661
662 def filter_tests(tests, filter_cb):
663     result = TestSuiteWrapper()
664     for t in tests:
665         if isinstance(t, unittest.suite.TestSuite):
666             # this is a bunch of tests, recursively filter...
667             x = filter_tests(t, filter_cb)
668             if x.countTestCases() > 0:
669                 result.addTest(x)
670         elif isinstance(t, unittest.TestCase):
671             # this is a single test
672             parts = t.id().split(".")
673             # t.id() for common cases like this:
674             # test_classifier.TestClassifier.test_acl_ip
675             # apply filtering only if it is so
676             if len(parts) == 3:
677                 if not filter_cb(parts[0], parts[1], parts[2]):
678                     continue
679             result.addTest(t)
680         else:
681             # unexpected object, don't touch it
682             result.addTest(t)
683     return result
684
685
686 class FilterByTestOption:
687     def __init__(self, filters):
688         self.filters = filters
689
690     def __call__(self, file_name, class_name, func_name):
691         def test_one(
692             filter_file_name,
693             filter_class_name,
694             filter_func_name,
695             file_name,
696             class_name,
697             func_name,
698         ):
699             if filter_file_name:
700                 fn_match = fnmatch.fnmatch(file_name, filter_file_name)
701                 if not fn_match:
702                     return False
703             if filter_class_name and class_name != filter_class_name:
704                 return False
705             if filter_func_name and func_name != filter_func_name:
706                 return False
707             return True
708
709         for filter_file_name, filter_class_name, filter_func_name in self.filters:
710             if test_one(
711                 filter_file_name,
712                 filter_class_name,
713                 filter_func_name,
714                 file_name,
715                 class_name,
716                 func_name,
717             ):
718                 return True
719
720         return False
721
722
723 class FilterByClassList:
724     def __init__(self, classes_with_filenames):
725         self.classes_with_filenames = classes_with_filenames
726
727     def __call__(self, file_name, class_name, func_name):
728         return ".".join([file_name, class_name]) in self.classes_with_filenames
729
730
731 def suite_from_failed(suite, failed):
732     failed = {x.rsplit(".", 1)[0] for x in failed}
733     filter_cb = FilterByClassList(failed)
734     suite = filter_tests(suite, filter_cb)
735     return suite
736
737
738 class AllResults(dict):
739     def __init__(self):
740         super(AllResults, self).__init__()
741         self.all_testcases = 0
742         self.results_per_suite = []
743         for trc in list(TestResultCode):
744             self[trc] = 0
745         self.rerun = []
746         self.testsuites_no_tests_run = []
747
748     def add_results(self, result):
749         self.results_per_suite.append(result)
750         for trc in list(TestResultCode):
751             self[trc] += len(result[trc])
752
753     def add_result(self, result):
754         retval = 0
755         self.all_testcases += result.testcase_suite.countTestCases()
756         self.add_results(result)
757
758         if result.no_tests_run():
759             self.testsuites_no_tests_run.append(result.testcase_suite)
760             if result.crashed:
761                 retval = -1
762             else:
763                 retval = 1
764         elif not result.was_successful():
765             retval = 1
766
767         if retval != 0:
768             self.rerun.append(result.testcase_suite)
769
770         return retval
771
772     def print_results(self):
773         print("")
774         print(double_line_delim)
775         print("TEST RESULTS:")
776
777         def indent_results(lines):
778             lines = list(filter(None, lines))
779             maximum = max(lines, key=lambda x: x.index(":"))
780             maximum = 4 + maximum.index(":")
781             for l in lines:
782                 padding = " " * (maximum - l.index(":"))
783                 print(f"{padding}{l}")
784
785         indent_results(
786             [
787                 f"Scheduled tests: {self.all_testcases}",
788                 f"Executed tests: {self[TestResultCode.TEST_RUN]}",
789                 f"Passed tests: {colorize(self[TestResultCode.PASS], GREEN)}",
790                 f"Expected failures: {colorize(self[TestResultCode.EXPECTED_FAIL], GREEN)}"
791                 if self[TestResultCode.EXPECTED_FAIL]
792                 else None,
793                 f"Skipped tests: {colorize(self[TestResultCode.SKIP], YELLOW)}"
794                 if self[TestResultCode.SKIP]
795                 else None,
796                 f"Not Executed tests: {colorize(self.not_executed, RED)}"
797                 if self.not_executed
798                 else None,
799                 f"Failures: {colorize(self[TestResultCode.FAIL], RED)}"
800                 if self[TestResultCode.FAIL]
801                 else None,
802                 f"Unexpected passes: {colorize(self[TestResultCode.UNEXPECTED_PASS], RED)}"
803                 if self[TestResultCode.UNEXPECTED_PASS]
804                 else None,
805                 f"Errors: {colorize(self[TestResultCode.ERROR], RED)}"
806                 if self[TestResultCode.ERROR]
807                 else None,
808                 "Tests skipped due to lack of CPUS: "
809                 f"{colorize(self[TestResultCode.SKIP_CPU_SHORTAGE], YELLOW)}"
810                 if self[TestResultCode.SKIP_CPU_SHORTAGE]
811                 else None,
812             ]
813         )
814
815         if self.all_failed > 0:
816             print("FAILURES AND ERRORS IN TESTS:")
817             for result in self.results_per_suite:
818                 old_testcase_name = None
819                 for tr_code, headline in (
820                     (TestResultCode.FAIL, "FAILURE"),
821                     (TestResultCode.ERROR, "ERROR"),
822                     (TestResultCode.UNEXPECTED_PASS, "UNEXPECTED PASS"),
823                 ):
824                     if not result[tr_code]:
825                         continue
826
827                     for failed_test_id in result[tr_code]:
828                         new_testcase_name, test_name = result.get_testcase_names(
829                             failed_test_id
830                         )
831                         if new_testcase_name != old_testcase_name:
832                             print(
833                                 f"  Testcase name: {colorize(new_testcase_name, RED)}"
834                             )
835                             old_testcase_name = new_testcase_name
836                         print(
837                             f"    {headline}: {colorize(test_name, RED)} [{failed_test_id}]"
838                         )
839
840         if self.testsuites_no_tests_run:
841             print("TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:")
842             tc_classes = set()
843             for testsuite in self.testsuites_no_tests_run:
844                 for testcase in testsuite:
845                     tc_classes.add(get_testcase_doc_name(testcase))
846             for tc_class in tc_classes:
847                 print("  {}".format(colorize(tc_class, RED)))
848
849         if self[TestResultCode.SKIP_CPU_SHORTAGE]:
850             print()
851             print(
852                 colorize(
853                     "     SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT"
854                     " ENOUGH CPUS AVAILABLE",
855                     YELLOW,
856                 )
857             )
858         print(double_line_delim)
859         print("")
860
861     @property
862     def not_executed(self):
863         return self.all_testcases - self[TestResultCode.TEST_RUN]
864
865     @property
866     def all_failed(self):
867         return (
868             self[TestResultCode.FAIL]
869             + self[TestResultCode.ERROR]
870             + self[TestResultCode.UNEXPECTED_PASS]
871         )
872
873
874 def parse_results(results):
875     """
876     Prints the number of scheduled, executed, not executed, passed, failed,
877     errored and skipped tests and details about failed and errored tests.
878
879     Also returns all suites where any test failed.
880
881     :param results:
882     :return:
883     """
884
885     results_per_suite = AllResults()
886     crashed = False
887     failed = False
888     for result in results:
889         result_code = results_per_suite.add_result(result)
890         if result_code == 1:
891             failed = True
892         elif result_code == -1:
893             crashed = True
894
895     results_per_suite.print_results()
896
897     if crashed:
898         return_code = -1
899     elif failed:
900         return_code = 1
901     else:
902         return_code = 0
903     return return_code, results_per_suite.rerun
904
905
906 if __name__ == "__main__":
907     print(f"Config is: {config}")
908
909     if config.sanity:
910         print("Running sanity test case.")
911         try:
912             rc = sanity_run_vpp.main()
913             if rc != 0:
914                 sys.exit(rc)
915         except Exception as e:
916             print(traceback.format_exc())
917             print("Couldn't run sanity test case.")
918             sys.exit(-1)
919
920     test_finished_join_timeout = 15
921
922     debug_gdb = config.debug in ["gdb", "gdbserver", "attach"]
923     debug_core = config.debug == "core"
924
925     run_interactive = debug_gdb or config.step or config.force_foreground
926
927     max_concurrent_tests = 0
928     print(f"OS reports {num_cpus} available cpu(s).")
929
930     test_jobs = config.jobs
931     if test_jobs == "auto":
932         if run_interactive:
933             max_concurrent_tests = 1
934             print("Interactive mode required, running tests consecutively.")
935         else:
936             max_concurrent_tests = num_cpus
937             print(
938                 f"Running at most {max_concurrent_tests} python test "
939                 "processes concurrently."
940             )
941     else:
942         max_concurrent_tests = test_jobs
943         print(
944             f"Running at most {max_concurrent_tests} python test processes "
945             "concurrently as set by 'TEST_JOBS'."
946         )
947
948     print(f"Using at most {max_vpp_cpus} cpus for VPP threads.")
949
950     if run_interactive and max_concurrent_tests > 1:
951         raise NotImplementedError(
952             "Running tests interactively (DEBUG is gdb[server] or ATTACH or "
953             "STEP is set) in parallel (TEST_JOBS is more than 1) is not "
954             "supported"
955         )
956
957     descriptions = True
958
959     print("Running tests using custom test runner.")
960     filters = [(parse_test_filter(f)) for f in config.filter.split(",")]
961
962     print(
963         "Selected filters: ",
964         "|".join(
965             f"file={filter_file}, class={filter_class}, function={filter_func}"
966             for filter_file, filter_class, filter_func in filters
967         ),
968     )
969
970     filter_cb = FilterByTestOption(filters)
971
972     cb = SplitToSuitesCallback(filter_cb)
973     for d in config.test_src_dir:
974         print("Adding tests from directory tree %s" % d)
975         discover_tests(d, cb)
976
977     # suites are not hashable, need to use list
978     suites = []
979     tests_amount = 0
980     for testcase_suite in cb.suites.values():
981         tests_amount += testcase_suite.countTestCases()
982         if testcase_suite.cpus_used > max_vpp_cpus:
983             # here we replace test functions with lambdas to just skip them
984             # but we also replace setUp/tearDown functions to do nothing
985             # so that the test can be "started" and "stopped", so that we can
986             # still keep those prints (test description - SKIP), which are done
987             # in stopTest() (for that to trigger, test function must run)
988             for t in testcase_suite:
989                 for m in dir(t):
990                     if m.startswith("test_"):
991                         setattr(t, m, lambda: t.skipTest("not enough cpus"))
992                 setattr(t.__class__, "setUpClass", lambda: None)
993                 setattr(t.__class__, "tearDownClass", lambda: None)
994                 setattr(t, "setUp", lambda: None)
995                 setattr(t, "tearDown", lambda: None)
996                 t.__class__.skipped_due_to_cpu_lack = True
997         suites.append(testcase_suite)
998
999     print(
1000         "%s out of %s tests match specified filters"
1001         % (tests_amount, tests_amount + cb.filtered.countTestCases())
1002     )
1003
1004     if not config.extended:
1005         print("Not running extended tests (some tests will be skipped)")
1006
1007     attempts = config.retries + 1
1008     if attempts > 1:
1009         print("Perform %s attempts to pass the suite..." % attempts)
1010
1011     if run_interactive and suites:
1012         # don't fork if requiring interactive terminal
1013         print("Running tests in foreground in the current process")
1014         full_suite = unittest.TestSuite()
1015         free_cpus = list(available_cpus)
1016         cpu_shortage = False
1017         for suite in suites:
1018             if suite.cpus_used <= max_vpp_cpus:
1019                 suite.assign_cpus(free_cpus[: suite.cpus_used])
1020             else:
1021                 suite.assign_cpus([])
1022                 cpu_shortage = True
1023         full_suite.addTests(suites)
1024         result = VppTestRunner(
1025             verbosity=config.verbose, failfast=config.failfast, print_summary=True
1026         ).run(full_suite)
1027         was_successful = result.wasSuccessful()
1028         if not was_successful:
1029             for test_case_info in result.failed_test_cases_info:
1030                 handle_failed_suite(
1031                     test_case_info.logger,
1032                     test_case_info.tempdir,
1033                     test_case_info.vpp_pid,
1034                     config.vpp,
1035                 )
1036                 if test_case_info in result.core_crash_test_cases_info:
1037                     check_and_handle_core(
1038                         test_case_info.vpp_bin_path,
1039                         test_case_info.tempdir,
1040                         test_case_info.core_crash_test,
1041                     )
1042
1043         if cpu_shortage:
1044             print()
1045             print(
1046                 colorize(
1047                     "SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT"
1048                     " ENOUGH CPUS AVAILABLE",
1049                     YELLOW,
1050                 )
1051             )
1052             print()
1053         sys.exit(not was_successful)
1054     else:
1055         print(
1056             "Running each VPPTestCase in a separate background process"
1057             f" with at most {max_concurrent_tests} parallel python test "
1058             "process(es)"
1059         )
1060         exit_code = 0
1061         while suites and attempts > 0:
1062             for suite in suites:
1063                 failed_link = get_failed_testcase_linkname(
1064                     config.failed_dir,
1065                     f"{get_testcase_dirname(suite._tests[0].__class__.__name__)}",
1066                 )
1067                 if os.path.islink(failed_link):
1068                     os.unlink(failed_link)
1069             results = run_forked(suites)
1070             exit_code, suites = parse_results(results)
1071             attempts -= 1
1072             if exit_code == 0:
1073                 print("Test run was successful")
1074             else:
1075                 print("%s attempt(s) left." % attempts)
1076         sys.exit(exit_code)