crypto crypto-openssl: support hashing operations
[vpp.git] / test / run_tests.py
1 #!/usr/bin/env python3
2
3 import sys
4 import shutil
5 import os
6 import fnmatch
7 import unittest
8 import argparse
9 import time
10 import threading
11 import signal
12 import re
13 from multiprocessing import Process, Pipe, get_context
14 from multiprocessing.queues import Queue
15 from multiprocessing.managers import BaseManager
16 import framework
17 from framework import VppTestRunner, running_extended_tests, VppTestCase, \
18     get_testcase_doc_name, get_test_description, PASS, FAIL, ERROR, SKIP, \
19     TEST_RUN, SKIP_CPU_SHORTAGE
20 from debug import spawn_gdb, start_vpp_in_gdb
21 from log import get_parallel_logger, double_line_delim, RED, YELLOW, GREEN, \
22     colorize, single_line_delim
23 from discover_tests import discover_tests
24 from subprocess import check_output, CalledProcessError
25 from util import check_core_path, get_core_path, is_core_present
26 from cpu_config import num_cpus, max_vpp_cpus, available_cpus
27
28 # timeout which controls how long the child has to finish after seeing
29 # a core dump in test temporary directory. If this is exceeded, parent assumes
30 # that child process is stuck (e.g. waiting for event from vpp) and kill
31 # the child
32 core_timeout = 3
33
34
35 class StreamQueue(Queue):
36     def write(self, msg):
37         self.put(msg)
38
39     def flush(self):
40         sys.__stdout__.flush()
41         sys.__stderr__.flush()
42
43     def fileno(self):
44         return self._writer.fileno()
45
46
47 class StreamQueueManager(BaseManager):
48     pass
49
50
51 StreamQueueManager.register('StreamQueue', StreamQueue)
52
53
54 class TestResult(dict):
55     def __init__(self, testcase_suite, testcases_by_id=None):
56         super(TestResult, self).__init__()
57         self[PASS] = []
58         self[FAIL] = []
59         self[ERROR] = []
60         self[SKIP] = []
61         self[SKIP_CPU_SHORTAGE] = []
62         self[TEST_RUN] = []
63         self.crashed = False
64         self.testcase_suite = testcase_suite
65         self.testcases = [testcase for testcase in testcase_suite]
66         self.testcases_by_id = testcases_by_id
67
68     def was_successful(self):
69         return 0 == len(self[FAIL]) == len(self[ERROR]) \
70             and len(self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE]) \
71             == self.testcase_suite.countTestCases()
72
73     def no_tests_run(self):
74         return 0 == len(self[TEST_RUN])
75
76     def process_result(self, test_id, result):
77         self[result].append(test_id)
78
79     def suite_from_failed(self):
80         rerun_ids = set([])
81         for testcase in self.testcase_suite:
82             tc_id = testcase.id()
83             if tc_id not in self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE]:
84                 rerun_ids.add(tc_id)
85         if rerun_ids:
86             return suite_from_failed(self.testcase_suite, rerun_ids)
87
88     def get_testcase_names(self, test_id):
89         # could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
90         setup_teardown_match = re.match(
91             r'((tearDownClass)|(setUpClass)) \((.+\..+)\)', test_id)
92         if setup_teardown_match:
93             test_name, _, _, testcase_name = setup_teardown_match.groups()
94             if len(testcase_name.split('.')) == 2:
95                 for key in self.testcases_by_id.keys():
96                     if key.startswith(testcase_name):
97                         testcase_name = key
98                         break
99             testcase_name = self._get_testcase_doc_name(testcase_name)
100         else:
101             test_name = self._get_test_description(test_id)
102             testcase_name = self._get_testcase_doc_name(test_id)
103
104         return testcase_name, test_name
105
106     def _get_test_description(self, test_id):
107         if test_id in self.testcases_by_id:
108             desc = get_test_description(descriptions,
109                                         self.testcases_by_id[test_id])
110         else:
111             desc = test_id
112         return desc
113
114     def _get_testcase_doc_name(self, test_id):
115         if test_id in self.testcases_by_id:
116             doc_name = get_testcase_doc_name(self.testcases_by_id[test_id])
117         else:
118             doc_name = test_id
119         return doc_name
120
121
122 def test_runner_wrapper(suite, keep_alive_pipe, stdouterr_queue,
123                         finished_pipe, result_pipe, logger):
124     sys.stdout = stdouterr_queue
125     sys.stderr = stdouterr_queue
126     VppTestCase.parallel_handler = logger.handlers[0]
127     result = VppTestRunner(keep_alive_pipe=keep_alive_pipe,
128                            descriptions=descriptions,
129                            verbosity=verbose,
130                            result_pipe=result_pipe,
131                            failfast=failfast,
132                            print_summary=False).run(suite)
133     finished_pipe.send(result.wasSuccessful())
134     finished_pipe.close()
135     keep_alive_pipe.close()
136
137
138 class TestCaseWrapper(object):
139     def __init__(self, testcase_suite, manager):
140         self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(
141             duplex=False)
142         self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
143         self.result_parent_end, self.result_child_end = Pipe(duplex=False)
144         self.testcase_suite = testcase_suite
145         self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
146         self.logger = get_parallel_logger(self.stdouterr_queue)
147         self.child = Process(target=test_runner_wrapper,
148                              args=(testcase_suite,
149                                    self.keep_alive_child_end,
150                                    self.stdouterr_queue,
151                                    self.finished_child_end,
152                                    self.result_child_end,
153                                    self.logger)
154                              )
155         self.child.start()
156         self.last_test_temp_dir = None
157         self.last_test_vpp_binary = None
158         self._last_test = None
159         self.last_test_id = None
160         self.vpp_pid = None
161         self.last_heard = time.time()
162         self.core_detected_at = None
163         self.testcases_by_id = {}
164         self.testclasess_with_core = {}
165         for testcase in self.testcase_suite:
166             self.testcases_by_id[testcase.id()] = testcase
167         self.result = TestResult(testcase_suite, self.testcases_by_id)
168
169     @property
170     def last_test(self):
171         return self._last_test
172
173     @last_test.setter
174     def last_test(self, test_id):
175         self.last_test_id = test_id
176         if test_id in self.testcases_by_id:
177             testcase = self.testcases_by_id[test_id]
178             self._last_test = testcase.shortDescription()
179             if not self._last_test:
180                 self._last_test = str(testcase)
181         else:
182             self._last_test = test_id
183
184     def add_testclass_with_core(self):
185         if self.last_test_id in self.testcases_by_id:
186             test = self.testcases_by_id[self.last_test_id]
187             class_name = unittest.util.strclass(test.__class__)
188             test_name = "'{}' ({})".format(get_test_description(descriptions,
189                                                                 test),
190                                            self.last_test_id)
191         else:
192             test_name = self.last_test_id
193             class_name = re.match(r'((tearDownClass)|(setUpClass)) '
194                                   r'\((.+\..+)\)', test_name).groups()[3]
195         if class_name not in self.testclasess_with_core:
196             self.testclasess_with_core[class_name] = (
197                 test_name,
198                 self.last_test_vpp_binary,
199                 self.last_test_temp_dir)
200
201     def close_pipes(self):
202         self.keep_alive_child_end.close()
203         self.finished_child_end.close()
204         self.result_child_end.close()
205         self.keep_alive_parent_end.close()
206         self.finished_parent_end.close()
207         self.result_parent_end.close()
208
209     def was_successful(self):
210         return self.result.was_successful()
211
212     @property
213     def cpus_used(self):
214         return self.testcase_suite.cpus_used
215
216     def get_assigned_cpus(self):
217         return self.testcase_suite.get_assigned_cpus()
218
219
220 def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases,
221                              read_testcases):
222     read_testcase = None
223     while read_testcases.is_set() or unread_testcases:
224         if finished_unread_testcases:
225             read_testcase = finished_unread_testcases.pop()
226             unread_testcases.remove(read_testcase)
227         elif unread_testcases:
228             read_testcase = unread_testcases.pop()
229         if read_testcase:
230             data = ''
231             while data is not None:
232                 sys.stdout.write(data)
233                 data = read_testcase.stdouterr_queue.get()
234
235             read_testcase.stdouterr_queue.close()
236             finished_unread_testcases.discard(read_testcase)
237             read_testcase = None
238
239
240 def handle_failed_suite(logger, last_test_temp_dir, vpp_pid):
241     if last_test_temp_dir:
242         # Need to create link in case of a timeout or core dump without failure
243         lttd = os.path.basename(last_test_temp_dir)
244         failed_dir = os.getenv('FAILED_DIR')
245         link_path = '%s%s-FAILED' % (failed_dir, lttd)
246         if not os.path.exists(link_path):
247             os.symlink(last_test_temp_dir, link_path)
248         logger.error("Symlink to failed testcase directory: %s -> %s"
249                      % (link_path, lttd))
250
251         # Report core existence
252         core_path = get_core_path(last_test_temp_dir)
253         if os.path.exists(core_path):
254             logger.error(
255                 "Core-file exists in test temporary directory: %s!" %
256                 core_path)
257             check_core_path(logger, core_path)
258             logger.debug("Running 'file %s':" % core_path)
259             try:
260                 info = check_output(["file", core_path])
261                 logger.debug(info)
262             except CalledProcessError as e:
263                 logger.error("Subprocess returned with return code "
264                              "while running `file' utility on core-file "
265                              "returned: "
266                              "rc=%s", e.returncode)
267             except OSError as e:
268                 logger.error("Subprocess returned with OS error while "
269                              "running 'file' utility "
270                              "on core-file: "
271                              "(%s) %s", e.errno, e.strerror)
272             except Exception as e:
273                 logger.exception("Unexpected error running `file' utility "
274                                  "on core-file")
275             logger.error("gdb %s %s" %
276                          (os.getenv('VPP_BIN', 'vpp'), core_path))
277
278     if vpp_pid:
279         # Copy api post mortem
280         api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
281         if os.path.isfile(api_post_mortem_path):
282             logger.error("Copying api_post_mortem.%d to %s" %
283                          (vpp_pid, last_test_temp_dir))
284             shutil.copy2(api_post_mortem_path, last_test_temp_dir)
285
286
287 def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
288     if is_core_present(tempdir):
289         if debug_core:
290             print('VPP core detected in %s. Last test running was %s' %
291                   (tempdir, core_crash_test))
292             print(single_line_delim)
293             spawn_gdb(vpp_binary, get_core_path(tempdir))
294             print(single_line_delim)
295         elif compress_core:
296             print("Compressing core-file in test directory `%s'" % tempdir)
297             os.system("gzip %s" % get_core_path(tempdir))
298
299
300 def handle_cores(failed_testcases):
301     for failed_testcase in failed_testcases:
302         tcs_with_core = failed_testcase.testclasess_with_core
303         if tcs_with_core:
304             for test, vpp_binary, tempdir in tcs_with_core.values():
305                 check_and_handle_core(vpp_binary, tempdir, test)
306
307
308 def process_finished_testsuite(wrapped_testcase_suite,
309                                finished_testcase_suites,
310                                failed_wrapped_testcases,
311                                results):
312     results.append(wrapped_testcase_suite.result)
313     finished_testcase_suites.add(wrapped_testcase_suite)
314     stop_run = False
315     if failfast and not wrapped_testcase_suite.was_successful():
316         stop_run = True
317
318     if not wrapped_testcase_suite.was_successful():
319         failed_wrapped_testcases.add(wrapped_testcase_suite)
320         handle_failed_suite(wrapped_testcase_suite.logger,
321                             wrapped_testcase_suite.last_test_temp_dir,
322                             wrapped_testcase_suite.vpp_pid)
323
324     return stop_run
325
326
327 def run_forked(testcase_suites):
328     wrapped_testcase_suites = set()
329     solo_testcase_suites = []
330
331     # suites are unhashable, need to use list
332     results = []
333     unread_testcases = set()
334     finished_unread_testcases = set()
335     manager = StreamQueueManager()
336     manager.start()
337     tests_running = 0
338     free_cpus = list(available_cpus)
339
340     def on_suite_start(tc):
341         nonlocal tests_running
342         nonlocal free_cpus
343         tests_running = tests_running + 1
344
345     def on_suite_finish(tc):
346         nonlocal tests_running
347         nonlocal free_cpus
348         tests_running = tests_running - 1
349         assert tests_running >= 0
350         free_cpus.extend(tc.get_assigned_cpus())
351
352     def run_suite(suite):
353         nonlocal manager
354         nonlocal wrapped_testcase_suites
355         nonlocal unread_testcases
356         nonlocal free_cpus
357         suite.assign_cpus(free_cpus[:suite.cpus_used])
358         free_cpus = free_cpus[suite.cpus_used:]
359         wrapper = TestCaseWrapper(suite, manager)
360         wrapped_testcase_suites.add(wrapper)
361         unread_testcases.add(wrapper)
362         on_suite_start(suite)
363
364     def can_run_suite(suite):
365         return (tests_running < max_concurrent_tests and
366                 (suite.cpus_used <= len(free_cpus) or
367                  suite.cpus_used > max_vpp_cpus))
368
369     while free_cpus and testcase_suites:
370         a_suite = testcase_suites[0]
371         if a_suite.is_tagged_run_solo:
372             a_suite = testcase_suites.pop(0)
373             solo_testcase_suites.append(a_suite)
374             continue
375         if can_run_suite(a_suite):
376             a_suite = testcase_suites.pop(0)
377             run_suite(a_suite)
378         else:
379             break
380
381     if tests_running == 0 and solo_testcase_suites:
382         a_suite = solo_testcase_suites.pop(0)
383         run_suite(a_suite)
384
385     read_from_testcases = threading.Event()
386     read_from_testcases.set()
387     stdouterr_thread = threading.Thread(target=stdouterr_reader_wrapper,
388                                         args=(unread_testcases,
389                                               finished_unread_testcases,
390                                               read_from_testcases))
391     stdouterr_thread.start()
392
393     failed_wrapped_testcases = set()
394     stop_run = False
395
396     try:
397         while wrapped_testcase_suites:
398             finished_testcase_suites = set()
399             for wrapped_testcase_suite in wrapped_testcase_suites:
400                 while wrapped_testcase_suite.result_parent_end.poll():
401                     wrapped_testcase_suite.result.process_result(
402                         *wrapped_testcase_suite.result_parent_end.recv())
403                     wrapped_testcase_suite.last_heard = time.time()
404
405                 while wrapped_testcase_suite.keep_alive_parent_end.poll():
406                     wrapped_testcase_suite.last_test, \
407                         wrapped_testcase_suite.last_test_vpp_binary, \
408                         wrapped_testcase_suite.last_test_temp_dir, \
409                         wrapped_testcase_suite.vpp_pid = \
410                         wrapped_testcase_suite.keep_alive_parent_end.recv()
411                     wrapped_testcase_suite.last_heard = time.time()
412
413                 if wrapped_testcase_suite.finished_parent_end.poll():
414                     wrapped_testcase_suite.finished_parent_end.recv()
415                     wrapped_testcase_suite.last_heard = time.time()
416                     stop_run = process_finished_testsuite(
417                         wrapped_testcase_suite,
418                         finished_testcase_suites,
419                         failed_wrapped_testcases,
420                         results) or stop_run
421                     continue
422
423                 fail = False
424                 if wrapped_testcase_suite.last_heard + test_timeout < \
425                         time.time():
426                     fail = True
427                     wrapped_testcase_suite.logger.critical(
428                         "Child test runner process timed out "
429                         "(last test running was `%s' in `%s')!" %
430                         (wrapped_testcase_suite.last_test,
431                          wrapped_testcase_suite.last_test_temp_dir))
432                 elif not wrapped_testcase_suite.child.is_alive():
433                     fail = True
434                     wrapped_testcase_suite.logger.critical(
435                         "Child test runner process unexpectedly died "
436                         "(last test running was `%s' in `%s')!" %
437                         (wrapped_testcase_suite.last_test,
438                          wrapped_testcase_suite.last_test_temp_dir))
439                 elif wrapped_testcase_suite.last_test_temp_dir and \
440                         wrapped_testcase_suite.last_test_vpp_binary:
441                     if is_core_present(
442                             wrapped_testcase_suite.last_test_temp_dir):
443                         wrapped_testcase_suite.add_testclass_with_core()
444                         if wrapped_testcase_suite.core_detected_at is None:
445                             wrapped_testcase_suite.core_detected_at = \
446                                 time.time()
447                         elif wrapped_testcase_suite.core_detected_at + \
448                                 core_timeout < time.time():
449                             wrapped_testcase_suite.logger.critical(
450                                 "Child test runner process unresponsive and "
451                                 "core-file exists in test temporary directory "
452                                 "(last test running was `%s' in `%s')!" %
453                                 (wrapped_testcase_suite.last_test,
454                                  wrapped_testcase_suite.last_test_temp_dir))
455                             fail = True
456
457                 if fail:
458                     wrapped_testcase_suite.child.terminate()
459                     try:
460                         # terminating the child process tends to leave orphan
461                         # VPP process around
462                         if wrapped_testcase_suite.vpp_pid:
463                             os.kill(wrapped_testcase_suite.vpp_pid,
464                                     signal.SIGTERM)
465                     except OSError:
466                         # already dead
467                         pass
468                     wrapped_testcase_suite.result.crashed = True
469                     wrapped_testcase_suite.result.process_result(
470                         wrapped_testcase_suite.last_test_id, ERROR)
471                     stop_run = process_finished_testsuite(
472                         wrapped_testcase_suite,
473                         finished_testcase_suites,
474                         failed_wrapped_testcases,
475                         results) or stop_run
476
477             for finished_testcase in finished_testcase_suites:
478                 # Somewhat surprisingly, the join below may
479                 # timeout, even if client signaled that
480                 # it finished - so we note it just in case.
481                 join_start = time.time()
482                 finished_testcase.child.join(test_finished_join_timeout)
483                 join_end = time.time()
484                 if join_end - join_start >= test_finished_join_timeout:
485                     finished_testcase.logger.error(
486                         "Timeout joining finished test: %s (pid %d)" %
487                         (finished_testcase.last_test,
488                          finished_testcase.child.pid))
489                 finished_testcase.close_pipes()
490                 wrapped_testcase_suites.remove(finished_testcase)
491                 finished_unread_testcases.add(finished_testcase)
492                 finished_testcase.stdouterr_queue.put(None)
493                 on_suite_finish(finished_testcase)
494                 if stop_run:
495                     while testcase_suites:
496                         results.append(TestResult(testcase_suites.pop(0)))
497                 elif testcase_suites:
498                     a_suite = testcase_suites.pop(0)
499                     while a_suite and a_suite.is_tagged_run_solo:
500                         solo_testcase_suites.append(a_suite)
501                         if testcase_suites:
502                             a_suite = testcase_suites.pop(0)
503                         else:
504                             a_suite = None
505                     if a_suite and can_run_suite(a_suite):
506                         run_suite(a_suite)
507                 if solo_testcase_suites and tests_running == 0:
508                     a_suite = solo_testcase_suites.pop(0)
509                     run_suite(a_suite)
510             time.sleep(0.1)
511     except Exception:
512         for wrapped_testcase_suite in wrapped_testcase_suites:
513             wrapped_testcase_suite.child.terminate()
514             wrapped_testcase_suite.stdouterr_queue.put(None)
515         raise
516     finally:
517         read_from_testcases.clear()
518         stdouterr_thread.join(test_timeout)
519         manager.shutdown()
520
521     handle_cores(failed_wrapped_testcases)
522     return results
523
524
525 class TestSuiteWrapper(unittest.TestSuite):
526     cpus_used = 0
527
528     def __init__(self):
529         return super().__init__()
530
531     def addTest(self, test):
532         self.cpus_used = max(self.cpus_used, test.get_cpus_required())
533         super().addTest(test)
534
535     def assign_cpus(self, cpus):
536         self.cpus = cpus
537
538     def _handleClassSetUp(self, test, result):
539         if not test.__class__.skipped_due_to_cpu_lack:
540             test.assign_cpus(self.cpus)
541         super()._handleClassSetUp(test, result)
542
543     def get_assigned_cpus(self):
544         return self.cpus
545
546
547 class SplitToSuitesCallback:
548     def __init__(self, filter_callback):
549         self.suites = {}
550         self.suite_name = 'default'
551         self.filter_callback = filter_callback
552         self.filtered = TestSuiteWrapper()
553
554     def __call__(self, file_name, cls, method):
555         test_method = cls(method)
556         if self.filter_callback(file_name, cls.__name__, method):
557             self.suite_name = file_name + cls.__name__
558             if self.suite_name not in self.suites:
559                 self.suites[self.suite_name] = TestSuiteWrapper()
560                 self.suites[self.suite_name].is_tagged_run_solo = False
561             self.suites[self.suite_name].addTest(test_method)
562             if test_method.is_tagged_run_solo():
563                 self.suites[self.suite_name].is_tagged_run_solo = True
564
565         else:
566             self.filtered.addTest(test_method)
567
568
569 test_option = "TEST"
570
571
572 def parse_test_option():
573     f = os.getenv(test_option, None)
574     filter_file_name = None
575     filter_class_name = None
576     filter_func_name = None
577     if f:
578         if '.' in f:
579             parts = f.split('.')
580             if len(parts) > 3:
581                 raise Exception("Unrecognized %s option: %s" %
582                                 (test_option, f))
583             if len(parts) > 2:
584                 if parts[2] not in ('*', ''):
585                     filter_func_name = parts[2]
586             if parts[1] not in ('*', ''):
587                 filter_class_name = parts[1]
588             if parts[0] not in ('*', ''):
589                 if parts[0].startswith('test_'):
590                     filter_file_name = parts[0]
591                 else:
592                     filter_file_name = 'test_%s' % parts[0]
593         else:
594             if f.startswith('test_'):
595                 filter_file_name = f
596             else:
597                 filter_file_name = 'test_%s' % f
598     if filter_file_name:
599         filter_file_name = '%s.py' % filter_file_name
600     return filter_file_name, filter_class_name, filter_func_name
601
602
603 def filter_tests(tests, filter_cb):
604     result = TestSuiteWrapper()
605     for t in tests:
606         if isinstance(t, unittest.suite.TestSuite):
607             # this is a bunch of tests, recursively filter...
608             x = filter_tests(t, filter_cb)
609             if x.countTestCases() > 0:
610                 result.addTest(x)
611         elif isinstance(t, unittest.TestCase):
612             # this is a single test
613             parts = t.id().split('.')
614             # t.id() for common cases like this:
615             # test_classifier.TestClassifier.test_acl_ip
616             # apply filtering only if it is so
617             if len(parts) == 3:
618                 if not filter_cb(parts[0], parts[1], parts[2]):
619                     continue
620             result.addTest(t)
621         else:
622             # unexpected object, don't touch it
623             result.addTest(t)
624     return result
625
626
627 class FilterByTestOption:
628     def __init__(self, filter_file_name, filter_class_name, filter_func_name):
629         self.filter_file_name = filter_file_name
630         self.filter_class_name = filter_class_name
631         self.filter_func_name = filter_func_name
632
633     def __call__(self, file_name, class_name, func_name):
634         if self.filter_file_name:
635             fn_match = fnmatch.fnmatch(file_name, self.filter_file_name)
636             if not fn_match:
637                 return False
638         if self.filter_class_name and class_name != self.filter_class_name:
639             return False
640         if self.filter_func_name and func_name != self.filter_func_name:
641             return False
642         return True
643
644
645 class FilterByClassList:
646     def __init__(self, classes_with_filenames):
647         self.classes_with_filenames = classes_with_filenames
648
649     def __call__(self, file_name, class_name, func_name):
650         return '.'.join([file_name, class_name]) in self.classes_with_filenames
651
652
653 def suite_from_failed(suite, failed):
654     failed = {x.rsplit('.', 1)[0] for x in failed}
655     filter_cb = FilterByClassList(failed)
656     suite = filter_tests(suite, filter_cb)
657     return suite
658
659
660 class AllResults(dict):
661     def __init__(self):
662         super(AllResults, self).__init__()
663         self.all_testcases = 0
664         self.results_per_suite = []
665         self[PASS] = 0
666         self[FAIL] = 0
667         self[ERROR] = 0
668         self[SKIP] = 0
669         self[SKIP_CPU_SHORTAGE] = 0
670         self[TEST_RUN] = 0
671         self.rerun = []
672         self.testsuites_no_tests_run = []
673
674     def add_results(self, result):
675         self.results_per_suite.append(result)
676         result_types = [PASS, FAIL, ERROR, SKIP, TEST_RUN, SKIP_CPU_SHORTAGE]
677         for result_type in result_types:
678             self[result_type] += len(result[result_type])
679
680     def add_result(self, result):
681         retval = 0
682         self.all_testcases += result.testcase_suite.countTestCases()
683         self.add_results(result)
684
685         if result.no_tests_run():
686             self.testsuites_no_tests_run.append(result.testcase_suite)
687             if result.crashed:
688                 retval = -1
689             else:
690                 retval = 1
691         elif not result.was_successful():
692             retval = 1
693
694         if retval != 0:
695             self.rerun.append(result.testcase_suite)
696
697         return retval
698
699     def print_results(self):
700         print('')
701         print(double_line_delim)
702         print('TEST RESULTS:')
703
704         def indent_results(lines):
705             lines = list(filter(None, lines))
706             maximum = max(lines, key=lambda x: x.index(":"))
707             maximum = 4 + maximum.index(":")
708             for l in lines:
709                 padding = " " * (maximum - l.index(":"))
710                 print(f"{padding}{l}")
711
712         indent_results([
713             f'Scheduled tests: {self.all_testcases}',
714             f'Executed tests: {self[TEST_RUN]}',
715             f'Passed tests: {colorize(self[PASS], GREEN)}',
716             f'Skipped tests: {colorize(self[SKIP], YELLOW)}'
717             if self[SKIP] else None,
718             f'Not Executed tests: {colorize(self.not_executed, RED)}'
719             if self.not_executed else None,
720             f'Failures: {colorize(self[FAIL], RED)}' if self[FAIL] else None,
721             f'Errors: {colorize(self[ERROR], RED)}' if self[ERROR] else None,
722             'Tests skipped due to lack of CPUS: '
723             f'{colorize(self[SKIP_CPU_SHORTAGE], YELLOW)}'
724             if self[SKIP_CPU_SHORTAGE] else None
725         ])
726
727         if self.all_failed > 0:
728             print('FAILURES AND ERRORS IN TESTS:')
729             for result in self.results_per_suite:
730                 failed_testcase_ids = result[FAIL]
731                 errored_testcase_ids = result[ERROR]
732                 old_testcase_name = None
733                 if failed_testcase_ids:
734                     for failed_test_id in failed_testcase_ids:
735                         new_testcase_name, test_name = \
736                             result.get_testcase_names(failed_test_id)
737                         if new_testcase_name != old_testcase_name:
738                             print('  Testcase name: {}'.format(
739                                 colorize(new_testcase_name, RED)))
740                             old_testcase_name = new_testcase_name
741                         print('    FAILURE: {} [{}]'.format(
742                             colorize(test_name, RED), failed_test_id))
743                 if errored_testcase_ids:
744                     for errored_test_id in errored_testcase_ids:
745                         new_testcase_name, test_name = \
746                             result.get_testcase_names(errored_test_id)
747                         if new_testcase_name != old_testcase_name:
748                             print('  Testcase name: {}'.format(
749                                 colorize(new_testcase_name, RED)))
750                             old_testcase_name = new_testcase_name
751                         print('      ERROR: {} [{}]'.format(
752                             colorize(test_name, RED), errored_test_id))
753         if self.testsuites_no_tests_run:
754             print('TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:')
755             tc_classes = set()
756             for testsuite in self.testsuites_no_tests_run:
757                 for testcase in testsuite:
758                     tc_classes.add(get_testcase_doc_name(testcase))
759             for tc_class in tc_classes:
760                 print('  {}'.format(colorize(tc_class, RED)))
761
762         if self[SKIP_CPU_SHORTAGE]:
763             print()
764             print(colorize('     SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT'
765                            ' ENOUGH CPUS AVAILABLE', YELLOW))
766         print(double_line_delim)
767         print('')
768
769     @property
770     def not_executed(self):
771         return self.all_testcases - self[TEST_RUN]
772
773     @property
774     def all_failed(self):
775         return self[FAIL] + self[ERROR]
776
777
778 def parse_results(results):
779     """
780     Prints the number of scheduled, executed, not executed, passed, failed,
781     errored and skipped tests and details about failed and errored tests.
782
783     Also returns all suites where any test failed.
784
785     :param results:
786     :return:
787     """
788
789     results_per_suite = AllResults()
790     crashed = False
791     failed = False
792     for result in results:
793         result_code = results_per_suite.add_result(result)
794         if result_code == 1:
795             failed = True
796         elif result_code == -1:
797             crashed = True
798
799     results_per_suite.print_results()
800
801     if crashed:
802         return_code = -1
803     elif failed:
804         return_code = 1
805     else:
806         return_code = 0
807     return return_code, results_per_suite.rerun
808
809
810 def parse_digit_env(env_var, default):
811     value = os.getenv(env_var, default)
812     if value != default:
813         if value.isdigit():
814             value = int(value)
815         else:
816             print('WARNING: unsupported value "%s" for env var "%s",'
817                   'defaulting to %s' % (value, env_var, default))
818             value = default
819     return value
820
821
822 if __name__ == '__main__':
823
824     verbose = parse_digit_env("V", 0)
825
826     test_timeout = parse_digit_env("TIMEOUT", 600)  # default = 10 minutes
827
828     test_finished_join_timeout = 15
829
830     retries = parse_digit_env("RETRIES", 0)
831
832     debug = os.getenv("DEBUG", "n").lower() in ["gdb", "gdbserver", "attach"]
833
834     debug_core = os.getenv("DEBUG", "").lower() == "core"
835     compress_core = framework.BoolEnvironmentVariable("CORE_COMPRESS")
836
837     if os.getenv("VPP_IN_GDB", "n").lower() in ["1", "y", "yes"]:
838         start_vpp_in_gdb()
839         exit()
840
841     step = framework.BoolEnvironmentVariable("STEP")
842     force_foreground = framework.BoolEnvironmentVariable("FORCE_FOREGROUND")
843
844     run_interactive = debug or step or force_foreground
845
846     max_concurrent_tests = 0
847     print(f"OS reports {num_cpus} available cpu(s).")
848
849     test_jobs = os.getenv("TEST_JOBS", "1").lower()  # default = 1 process
850     if test_jobs == 'auto':
851         if run_interactive:
852             max_concurrent_tests = 1
853             print('Interactive mode required, running tests consecutively.')
854         else:
855             max_concurrent_tests = num_cpus
856             print(f"Running at most {max_concurrent_tests} python test "
857                   "processes concurrently.")
858     else:
859         try:
860             test_jobs = int(test_jobs)
861         except ValueError as e:
862             raise ValueError("Invalid TEST_JOBS value specified, valid "
863                              "values are a positive integer or 'auto'") from e
864         if test_jobs <= 0:
865             raise ValueError("Invalid TEST_JOBS value specified, valid "
866                              "values are a positive integer or 'auto'")
867         max_concurrent_tests = int(test_jobs)
868         print(f"Running at most {max_concurrent_tests} python test processes "
869               "concurrently as set by 'TEST_JOBS'.")
870
871     print(f"Using at most {max_vpp_cpus} cpus for VPP threads.")
872
873     if run_interactive and max_concurrent_tests > 1:
874         raise NotImplementedError(
875             'Running tests interactively (DEBUG is gdb[server] or ATTACH or '
876             'STEP is set) in parallel (TEST_JOBS is more than 1) is not '
877             'supported')
878
879     parser = argparse.ArgumentParser(description="VPP unit tests")
880     parser.add_argument("-f", "--failfast", action='store_true',
881                         help="fast failure flag")
882     parser.add_argument("-d", "--dir", action='append', type=str,
883                         help="directory containing test files "
884                              "(may be specified multiple times)")
885     args = parser.parse_args()
886     failfast = args.failfast
887     descriptions = True
888
889     print("Running tests using custom test runner.")
890     filter_file, filter_class, filter_func = parse_test_option()
891
892     print("Active filters: file=%s, class=%s, function=%s" % (
893         filter_file, filter_class, filter_func))
894
895     filter_cb = FilterByTestOption(filter_file, filter_class, filter_func)
896
897     ignore_path = os.getenv("VENV_PATH", None)
898     cb = SplitToSuitesCallback(filter_cb)
899     for d in args.dir:
900         print("Adding tests from directory tree %s" % d)
901         discover_tests(d, cb, ignore_path)
902
903     # suites are not hashable, need to use list
904     suites = []
905     tests_amount = 0
906     for testcase_suite in cb.suites.values():
907         tests_amount += testcase_suite.countTestCases()
908         if testcase_suite.cpus_used > max_vpp_cpus:
909             # here we replace test functions with lambdas to just skip them
910             # but we also replace setUp/tearDown functions to do nothing
911             # so that the test can be "started" and "stopped", so that we can
912             # still keep those prints (test description - SKIP), which are done
913             # in stopTest() (for that to trigger, test function must run)
914             for t in testcase_suite:
915                 for m in dir(t):
916                     if m.startswith('test_'):
917                         setattr(t, m, lambda: t.skipTest("not enough cpus"))
918                 setattr(t.__class__, 'setUpClass', lambda: None)
919                 setattr(t.__class__, 'tearDownClass', lambda: None)
920                 setattr(t, 'setUp', lambda: None)
921                 setattr(t, 'tearDown', lambda: None)
922                 t.__class__.skipped_due_to_cpu_lack = True
923         suites.append(testcase_suite)
924
925     print("%s out of %s tests match specified filters" % (
926         tests_amount, tests_amount + cb.filtered.countTestCases()))
927
928     if not running_extended_tests:
929         print("Not running extended tests (some tests will be skipped)")
930
931     attempts = retries + 1
932     if attempts > 1:
933         print("Perform %s attempts to pass the suite..." % attempts)
934
935     if run_interactive and suites:
936         # don't fork if requiring interactive terminal
937         print('Running tests in foreground in the current process')
938         full_suite = unittest.TestSuite()
939         free_cpus = list(available_cpus)
940         cpu_shortage = False
941         for suite in suites:
942             if suite.cpus_used <= max_vpp_cpus:
943                 suite.assign_cpus(free_cpus[:suite.cpus_used])
944             else:
945                 suite.assign_cpus([])
946                 cpu_shortage = True
947         full_suite.addTests(suites)
948         result = VppTestRunner(verbosity=verbose,
949                                failfast=failfast,
950                                print_summary=True).run(full_suite)
951         was_successful = result.wasSuccessful()
952         if not was_successful:
953             for test_case_info in result.failed_test_cases_info:
954                 handle_failed_suite(test_case_info.logger,
955                                     test_case_info.tempdir,
956                                     test_case_info.vpp_pid)
957                 if test_case_info in result.core_crash_test_cases_info:
958                     check_and_handle_core(test_case_info.vpp_bin_path,
959                                           test_case_info.tempdir,
960                                           test_case_info.core_crash_test)
961
962         if cpu_shortage:
963             print()
964             print(colorize('SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT'
965                            ' ENOUGH CPUS AVAILABLE', YELLOW))
966             print()
967         sys.exit(not was_successful)
968     else:
969         print('Running each VPPTestCase in a separate background process'
970               f' with at most {max_concurrent_tests} parallel python test '
971               'process(es)')
972         exit_code = 0
973         while suites and attempts > 0:
974             results = run_forked(suites)
975             exit_code, suites = parse_results(results)
976             attempts -= 1
977             if exit_code == 0:
978                 print('Test run was successful')
979             else:
980                 print('%s attempt(s) left.' % attempts)
981         sys.exit(exit_code)