1a29d148d9e311fc7efcebf8eea7ddc781d5ed53
[vpp.git] / test / run_tests.py
1 #!/usr/bin/env python3
2
3 import sys
4 import shutil
5 import os
6 import fnmatch
7 import unittest
8 import argparse
9 import time
10 import threading
11 import signal
12 import psutil
13 import re
14 import multiprocessing
15 from multiprocessing import Process, Pipe, cpu_count
16 from multiprocessing.queues import Queue
17 from multiprocessing.managers import BaseManager
18 import framework
19 from framework import VppTestRunner, running_extended_tests, VppTestCase, \
20     get_testcase_doc_name, get_test_description, PASS, FAIL, ERROR, SKIP, \
21     TEST_RUN
22 from debug import spawn_gdb
23 from log import get_parallel_logger, double_line_delim, RED, YELLOW, GREEN, \
24     colorize, single_line_delim
25 from discover_tests import discover_tests
26 from subprocess import check_output, CalledProcessError
27 from util import check_core_path, get_core_path, is_core_present
28
29 # timeout which controls how long the child has to finish after seeing
30 # a core dump in test temporary directory. If this is exceeded, parent assumes
31 # that child process is stuck (e.g. waiting for shm mutex, which will never
32 # get unlocked) and kill the child
33 core_timeout = 3
34 min_req_shm = 536870912  # min 512MB shm required
35 # 128MB per extra process
36 shm_per_process = 134217728
37
38
39 class StreamQueue(Queue):
40     def write(self, msg):
41         self.put(msg)
42
43     def flush(self):
44         sys.__stdout__.flush()
45         sys.__stderr__.flush()
46
47     def fileno(self):
48         return self._writer.fileno()
49
50
51 class StreamQueueManager(BaseManager):
52     pass
53
54
55 StreamQueueManager.register('StreamQueue', StreamQueue)
56
57
58 class TestResult(dict):
59     def __init__(self, testcase_suite, testcases_by_id=None):
60         super(TestResult, self).__init__()
61         self[PASS] = []
62         self[FAIL] = []
63         self[ERROR] = []
64         self[SKIP] = []
65         self[TEST_RUN] = []
66         self.crashed = False
67         self.testcase_suite = testcase_suite
68         self.testcases = [testcase for testcase in testcase_suite]
69         self.testcases_by_id = testcases_by_id
70
71     def was_successful(self):
72         return 0 == len(self[FAIL]) == len(self[ERROR]) \
73             and len(self[PASS] + self[SKIP]) \
74             == self.testcase_suite.countTestCases() == len(self[TEST_RUN])
75
76     def no_tests_run(self):
77         return 0 == len(self[TEST_RUN])
78
79     def process_result(self, test_id, result):
80         self[result].append(test_id)
81
82     def suite_from_failed(self):
83         rerun_ids = set([])
84         for testcase in self.testcase_suite:
85             tc_id = testcase.id()
86             if tc_id not in self[PASS] and tc_id not in self[SKIP]:
87                 rerun_ids.add(tc_id)
88         if rerun_ids:
89             return suite_from_failed(self.testcase_suite, rerun_ids)
90
91     def get_testcase_names(self, test_id):
92         # could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
93         setup_teardown_match = re.match(
94             r'((tearDownClass)|(setUpClass)) \((.+\..+)\)', test_id)
95         if setup_teardown_match:
96             test_name, _, _, testcase_name = setup_teardown_match.groups()
97             if len(testcase_name.split('.')) == 2:
98                 for key in self.testcases_by_id.keys():
99                     if key.startswith(testcase_name):
100                         testcase_name = key
101                         break
102             testcase_name = self._get_testcase_doc_name(testcase_name)
103         else:
104             test_name = self._get_test_description(test_id)
105             testcase_name = self._get_testcase_doc_name(test_id)
106
107         return testcase_name, test_name
108
109     def _get_test_description(self, test_id):
110         if test_id in self.testcases_by_id:
111             desc = get_test_description(descriptions,
112                                         self.testcases_by_id[test_id])
113         else:
114             desc = test_id
115         return desc
116
117     def _get_testcase_doc_name(self, test_id):
118         if test_id in self.testcases_by_id:
119             doc_name = get_testcase_doc_name(self.testcases_by_id[test_id])
120         else:
121             doc_name = test_id
122         return doc_name
123
124
125 def test_runner_wrapper(suite, keep_alive_pipe, stdouterr_queue,
126                         finished_pipe, result_pipe, logger):
127     sys.stdout = stdouterr_queue
128     sys.stderr = stdouterr_queue
129     VppTestCase.parallel_handler = logger.handlers[0]
130     result = VppTestRunner(keep_alive_pipe=keep_alive_pipe,
131                            descriptions=descriptions,
132                            verbosity=verbose,
133                            result_pipe=result_pipe,
134                            failfast=failfast,
135                            print_summary=False).run(suite)
136     finished_pipe.send(result.wasSuccessful())
137     finished_pipe.close()
138     keep_alive_pipe.close()
139
140
141 class TestCaseWrapper(object):
142     def __init__(self, testcase_suite, manager):
143         self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(
144             duplex=False)
145         self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
146         self.result_parent_end, self.result_child_end = Pipe(duplex=False)
147         self.testcase_suite = testcase_suite
148         if sys.version[0] == '2':
149             self.stdouterr_queue = manager.StreamQueue()
150         else:
151             from multiprocessing import get_context
152             self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
153         self.logger = get_parallel_logger(self.stdouterr_queue)
154         self.child = Process(target=test_runner_wrapper,
155                              args=(testcase_suite,
156                                    self.keep_alive_child_end,
157                                    self.stdouterr_queue,
158                                    self.finished_child_end,
159                                    self.result_child_end,
160                                    self.logger)
161                              )
162         self.child.start()
163         self.last_test_temp_dir = None
164         self.last_test_vpp_binary = None
165         self._last_test = None
166         self.last_test_id = None
167         self.vpp_pid = None
168         self.last_heard = time.time()
169         self.core_detected_at = None
170         self.testcases_by_id = {}
171         self.testclasess_with_core = {}
172         for testcase in self.testcase_suite:
173             self.testcases_by_id[testcase.id()] = testcase
174         self.result = TestResult(testcase_suite, self.testcases_by_id)
175
176     @property
177     def last_test(self):
178         return self._last_test
179
180     @last_test.setter
181     def last_test(self, test_id):
182         self.last_test_id = test_id
183         if test_id in self.testcases_by_id:
184             testcase = self.testcases_by_id[test_id]
185             self._last_test = testcase.shortDescription()
186             if not self._last_test:
187                 self._last_test = str(testcase)
188         else:
189             self._last_test = test_id
190
191     def add_testclass_with_core(self):
192         if self.last_test_id in self.testcases_by_id:
193             test = self.testcases_by_id[self.last_test_id]
194             class_name = unittest.util.strclass(test.__class__)
195             test_name = "'{}' ({})".format(get_test_description(descriptions,
196                                                                 test),
197                                            self.last_test_id)
198         else:
199             test_name = self.last_test_id
200             class_name = re.match(r'((tearDownClass)|(setUpClass)) '
201                                   r'\((.+\..+)\)', test_name).groups()[3]
202         if class_name not in self.testclasess_with_core:
203             self.testclasess_with_core[class_name] = (
204                 test_name,
205                 self.last_test_vpp_binary,
206                 self.last_test_temp_dir)
207
208     def close_pipes(self):
209         self.keep_alive_child_end.close()
210         self.finished_child_end.close()
211         self.result_child_end.close()
212         self.keep_alive_parent_end.close()
213         self.finished_parent_end.close()
214         self.result_parent_end.close()
215
216     def was_successful(self):
217         return self.result.was_successful()
218
219
220 def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases,
221                              read_testcases):
222     read_testcase = None
223     while read_testcases.is_set() or unread_testcases:
224         if finished_unread_testcases:
225             read_testcase = finished_unread_testcases.pop()
226             unread_testcases.remove(read_testcase)
227         elif unread_testcases:
228             read_testcase = unread_testcases.pop()
229         if read_testcase:
230             data = ''
231             while data is not None:
232                 sys.stdout.write(data)
233                 data = read_testcase.stdouterr_queue.get()
234
235             read_testcase.stdouterr_queue.close()
236             finished_unread_testcases.discard(read_testcase)
237             read_testcase = None
238
239
240 def handle_failed_suite(logger, last_test_temp_dir, vpp_pid):
241     if last_test_temp_dir:
242         # Need to create link in case of a timeout or core dump without failure
243         lttd = os.path.basename(last_test_temp_dir)
244         failed_dir = os.getenv('FAILED_DIR')
245         link_path = '%s%s-FAILED' % (failed_dir, lttd)
246         if not os.path.exists(link_path):
247             os.symlink(last_test_temp_dir, link_path)
248         logger.error("Symlink to failed testcase directory: %s -> %s"
249                      % (link_path, lttd))
250
251         # Report core existence
252         core_path = get_core_path(last_test_temp_dir)
253         if os.path.exists(core_path):
254             logger.error(
255                 "Core-file exists in test temporary directory: %s!" %
256                 core_path)
257             check_core_path(logger, core_path)
258             logger.debug("Running 'file %s':" % core_path)
259             try:
260                 info = check_output(["file", core_path])
261                 logger.debug(info)
262             except CalledProcessError as e:
263                 logger.error("Subprocess returned with return code "
264                              "while running `file' utility on core-file "
265                              "returned: "
266                              "rc=%s", e.returncode)
267             except OSError as e:
268                 logger.error("Subprocess returned with OS error while "
269                              "running 'file' utility "
270                              "on core-file: "
271                              "(%s) %s", e.errno, e.strerror)
272             except Exception as e:
273                 logger.exception("Unexpected error running `file' utility "
274                                  "on core-file")
275             logger.error("gdb %s %s" %
276                          (os.getenv('VPP_BIN', 'vpp'), core_path))
277
278     if vpp_pid:
279         # Copy api post mortem
280         api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
281         if os.path.isfile(api_post_mortem_path):
282             logger.error("Copying api_post_mortem.%d to %s" %
283                          (vpp_pid, last_test_temp_dir))
284             shutil.copy2(api_post_mortem_path, last_test_temp_dir)
285
286
287 def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
288     if is_core_present(tempdir):
289         if debug_core:
290             print('VPP core detected in %s. Last test running was %s' %
291                   (tempdir, core_crash_test))
292             print(single_line_delim)
293             spawn_gdb(vpp_binary, get_core_path(tempdir))
294             print(single_line_delim)
295         elif compress_core:
296             print("Compressing core-file in test directory `%s'" % tempdir)
297             os.system("gzip %s" % get_core_path(tempdir))
298
299
300 def handle_cores(failed_testcases):
301     for failed_testcase in failed_testcases:
302         tcs_with_core = failed_testcase.testclasess_with_core
303         if tcs_with_core:
304             for test, vpp_binary, tempdir in tcs_with_core.values():
305                 check_and_handle_core(vpp_binary, tempdir, test)
306
307
308 def process_finished_testsuite(wrapped_testcase_suite,
309                                finished_testcase_suites,
310                                failed_wrapped_testcases,
311                                results):
312     results.append(wrapped_testcase_suite.result)
313     finished_testcase_suites.add(wrapped_testcase_suite)
314     stop_run = False
315     if failfast and not wrapped_testcase_suite.was_successful():
316         stop_run = True
317
318     if not wrapped_testcase_suite.was_successful():
319         failed_wrapped_testcases.add(wrapped_testcase_suite)
320         handle_failed_suite(wrapped_testcase_suite.logger,
321                             wrapped_testcase_suite.last_test_temp_dir,
322                             wrapped_testcase_suite.vpp_pid)
323
324     return stop_run
325
326
327 def run_forked(testcase_suites):
328     wrapped_testcase_suites = set()
329     solo_testcase_suites = []
330     total_test_runners = 0
331
332     # suites are unhashable, need to use list
333     results = []
334     unread_testcases = set()
335     finished_unread_testcases = set()
336     manager = StreamQueueManager()
337     manager.start()
338     total_test_runners = 0
339     while total_test_runners < concurrent_tests:
340         if testcase_suites:
341             a_suite = testcase_suites.pop(0)
342             if a_suite.is_tagged_run_solo:
343                 solo_testcase_suites.append(a_suite)
344                 continue
345             wrapped_testcase_suite = TestCaseWrapper(a_suite,
346                                                      manager)
347             wrapped_testcase_suites.add(wrapped_testcase_suite)
348             unread_testcases.add(wrapped_testcase_suite)
349             total_test_runners = total_test_runners + 1
350         else:
351             break
352
353     while total_test_runners < 1 and solo_testcase_suites:
354         if solo_testcase_suites:
355             a_suite = solo_testcase_suites.pop(0)
356             wrapped_testcase_suite = TestCaseWrapper(a_suite,
357                                                      manager)
358             wrapped_testcase_suites.add(wrapped_testcase_suite)
359             unread_testcases.add(wrapped_testcase_suite)
360             total_test_runners = total_test_runners + 1
361         else:
362             break
363
364     read_from_testcases = threading.Event()
365     read_from_testcases.set()
366     stdouterr_thread = threading.Thread(target=stdouterr_reader_wrapper,
367                                         args=(unread_testcases,
368                                               finished_unread_testcases,
369                                               read_from_testcases))
370     stdouterr_thread.start()
371
372     failed_wrapped_testcases = set()
373     stop_run = False
374
375     try:
376         while wrapped_testcase_suites:
377             finished_testcase_suites = set()
378             for wrapped_testcase_suite in wrapped_testcase_suites:
379                 while wrapped_testcase_suite.result_parent_end.poll():
380                     wrapped_testcase_suite.result.process_result(
381                         *wrapped_testcase_suite.result_parent_end.recv())
382                     wrapped_testcase_suite.last_heard = time.time()
383
384                 while wrapped_testcase_suite.keep_alive_parent_end.poll():
385                     wrapped_testcase_suite.last_test, \
386                         wrapped_testcase_suite.last_test_vpp_binary, \
387                         wrapped_testcase_suite.last_test_temp_dir, \
388                         wrapped_testcase_suite.vpp_pid = \
389                         wrapped_testcase_suite.keep_alive_parent_end.recv()
390                     wrapped_testcase_suite.last_heard = time.time()
391
392                 if wrapped_testcase_suite.finished_parent_end.poll():
393                     wrapped_testcase_suite.finished_parent_end.recv()
394                     wrapped_testcase_suite.last_heard = time.time()
395                     stop_run = process_finished_testsuite(
396                         wrapped_testcase_suite,
397                         finished_testcase_suites,
398                         failed_wrapped_testcases,
399                         results) or stop_run
400                     continue
401
402                 fail = False
403                 if wrapped_testcase_suite.last_heard + test_timeout < \
404                         time.time():
405                     fail = True
406                     wrapped_testcase_suite.logger.critical(
407                         "Child test runner process timed out "
408                         "(last test running was `%s' in `%s')!" %
409                         (wrapped_testcase_suite.last_test,
410                          wrapped_testcase_suite.last_test_temp_dir))
411                 elif not wrapped_testcase_suite.child.is_alive():
412                     fail = True
413                     wrapped_testcase_suite.logger.critical(
414                         "Child test runner process unexpectedly died "
415                         "(last test running was `%s' in `%s')!" %
416                         (wrapped_testcase_suite.last_test,
417                          wrapped_testcase_suite.last_test_temp_dir))
418                 elif wrapped_testcase_suite.last_test_temp_dir and \
419                         wrapped_testcase_suite.last_test_vpp_binary:
420                     if is_core_present(
421                             wrapped_testcase_suite.last_test_temp_dir):
422                         wrapped_testcase_suite.add_testclass_with_core()
423                         if wrapped_testcase_suite.core_detected_at is None:
424                             wrapped_testcase_suite.core_detected_at = \
425                                 time.time()
426                         elif wrapped_testcase_suite.core_detected_at + \
427                                 core_timeout < time.time():
428                             wrapped_testcase_suite.logger.critical(
429                                 "Child test runner process unresponsive and "
430                                 "core-file exists in test temporary directory "
431                                 "(last test running was `%s' in `%s')!" %
432                                 (wrapped_testcase_suite.last_test,
433                                  wrapped_testcase_suite.last_test_temp_dir))
434                             fail = True
435
436                 if fail:
437                     wrapped_testcase_suite.child.terminate()
438                     try:
439                         # terminating the child process tends to leave orphan
440                         # VPP process around
441                         if wrapped_testcase_suite.vpp_pid:
442                             os.kill(wrapped_testcase_suite.vpp_pid,
443                                     signal.SIGTERM)
444                     except OSError:
445                         # already dead
446                         pass
447                     wrapped_testcase_suite.result.crashed = True
448                     wrapped_testcase_suite.result.process_result(
449                         wrapped_testcase_suite.last_test_id, ERROR)
450                     stop_run = process_finished_testsuite(
451                         wrapped_testcase_suite,
452                         finished_testcase_suites,
453                         failed_wrapped_testcases,
454                         results) or stop_run
455
456             for finished_testcase in finished_testcase_suites:
457                 # Somewhat surprisingly, the join below may
458                 # timeout, even if client signaled that
459                 # it finished - so we note it just in case.
460                 join_start = time.time()
461                 finished_testcase.child.join(test_finished_join_timeout)
462                 join_end = time.time()
463                 if join_end - join_start >= test_finished_join_timeout:
464                     finished_testcase.logger.error(
465                         "Timeout joining finished test: %s (pid %d)" %
466                         (finished_testcase.last_test,
467                          finished_testcase.child.pid))
468                 finished_testcase.close_pipes()
469                 wrapped_testcase_suites.remove(finished_testcase)
470                 finished_unread_testcases.add(finished_testcase)
471                 finished_testcase.stdouterr_queue.put(None)
472                 total_test_runners = total_test_runners - 1
473                 if stop_run:
474                     while testcase_suites:
475                         results.append(TestResult(testcase_suites.pop(0)))
476                 elif testcase_suites:
477                     a_testcase = testcase_suites.pop(0)
478                     while a_testcase and a_testcase.is_tagged_run_solo:
479                         solo_testcase_suites.append(a_testcase)
480                         if testcase_suites:
481                             a_testcase = testcase_suites.pop(0)
482                         else:
483                             a_testcase = None
484                     if a_testcase:
485                         new_testcase = TestCaseWrapper(a_testcase,
486                                                        manager)
487                         wrapped_testcase_suites.add(new_testcase)
488                         total_test_runners = total_test_runners + 1
489                         unread_testcases.add(new_testcase)
490                 if solo_testcase_suites and total_test_runners == 0:
491                     a_testcase = solo_testcase_suites.pop(0)
492                     new_testcase = TestCaseWrapper(a_testcase,
493                                                    manager)
494                     wrapped_testcase_suites.add(new_testcase)
495                     total_test_runners = total_test_runners + 1
496                     unread_testcases.add(new_testcase)
497             time.sleep(0.1)
498     except Exception:
499         for wrapped_testcase_suite in wrapped_testcase_suites:
500             wrapped_testcase_suite.child.terminate()
501             wrapped_testcase_suite.stdouterr_queue.put(None)
502         raise
503     finally:
504         read_from_testcases.clear()
505         stdouterr_thread.join(test_timeout)
506         manager.shutdown()
507
508     handle_cores(failed_wrapped_testcases)
509     return results
510
511
512 class SplitToSuitesCallback:
513     def __init__(self, filter_callback):
514         self.suites = {}
515         self.suite_name = 'default'
516         self.filter_callback = filter_callback
517         self.filtered = unittest.TestSuite()
518
519     def __call__(self, file_name, cls, method):
520         test_method = cls(method)
521         if self.filter_callback(file_name, cls.__name__, method):
522             self.suite_name = file_name + cls.__name__
523             if self.suite_name not in self.suites:
524                 self.suites[self.suite_name] = unittest.TestSuite()
525                 self.suites[self.suite_name].is_tagged_run_solo = False
526             self.suites[self.suite_name].addTest(test_method)
527             if test_method.is_tagged_run_solo():
528                 self.suites[self.suite_name].is_tagged_run_solo = True
529
530         else:
531             self.filtered.addTest(test_method)
532
533
534 test_option = "TEST"
535
536
537 def parse_test_option():
538     f = os.getenv(test_option, None)
539     filter_file_name = None
540     filter_class_name = None
541     filter_func_name = None
542     if f:
543         if '.' in f:
544             parts = f.split('.')
545             if len(parts) > 3:
546                 raise Exception("Unrecognized %s option: %s" %
547                                 (test_option, f))
548             if len(parts) > 2:
549                 if parts[2] not in ('*', ''):
550                     filter_func_name = parts[2]
551             if parts[1] not in ('*', ''):
552                 filter_class_name = parts[1]
553             if parts[0] not in ('*', ''):
554                 if parts[0].startswith('test_'):
555                     filter_file_name = parts[0]
556                 else:
557                     filter_file_name = 'test_%s' % parts[0]
558         else:
559             if f.startswith('test_'):
560                 filter_file_name = f
561             else:
562                 filter_file_name = 'test_%s' % f
563     if filter_file_name:
564         filter_file_name = '%s.py' % filter_file_name
565     return filter_file_name, filter_class_name, filter_func_name
566
567
568 def filter_tests(tests, filter_cb):
569     result = unittest.suite.TestSuite()
570     for t in tests:
571         if isinstance(t, unittest.suite.TestSuite):
572             # this is a bunch of tests, recursively filter...
573             x = filter_tests(t, filter_cb)
574             if x.countTestCases() > 0:
575                 result.addTest(x)
576         elif isinstance(t, unittest.TestCase):
577             # this is a single test
578             parts = t.id().split('.')
579             # t.id() for common cases like this:
580             # test_classifier.TestClassifier.test_acl_ip
581             # apply filtering only if it is so
582             if len(parts) == 3:
583                 if not filter_cb(parts[0], parts[1], parts[2]):
584                     continue
585             result.addTest(t)
586         else:
587             # unexpected object, don't touch it
588             result.addTest(t)
589     return result
590
591
592 class FilterByTestOption:
593     def __init__(self, filter_file_name, filter_class_name, filter_func_name):
594         self.filter_file_name = filter_file_name
595         self.filter_class_name = filter_class_name
596         self.filter_func_name = filter_func_name
597
598     def __call__(self, file_name, class_name, func_name):
599         if self.filter_file_name:
600             fn_match = fnmatch.fnmatch(file_name, self.filter_file_name)
601             if not fn_match:
602                 return False
603         if self.filter_class_name and class_name != self.filter_class_name:
604             return False
605         if self.filter_func_name and func_name != self.filter_func_name:
606             return False
607         return True
608
609
610 class FilterByClassList:
611     def __init__(self, classes_with_filenames):
612         self.classes_with_filenames = classes_with_filenames
613
614     def __call__(self, file_name, class_name, func_name):
615         return '.'.join([file_name, class_name]) in self.classes_with_filenames
616
617
618 def suite_from_failed(suite, failed):
619     failed = {x.rsplit('.', 1)[0] for x in failed}
620     filter_cb = FilterByClassList(failed)
621     suite = filter_tests(suite, filter_cb)
622     return suite
623
624
625 class AllResults(dict):
626     def __init__(self):
627         super(AllResults, self).__init__()
628         self.all_testcases = 0
629         self.results_per_suite = []
630         self[PASS] = 0
631         self[FAIL] = 0
632         self[ERROR] = 0
633         self[SKIP] = 0
634         self[TEST_RUN] = 0
635         self.rerun = []
636         self.testsuites_no_tests_run = []
637
638     def add_results(self, result):
639         self.results_per_suite.append(result)
640         result_types = [PASS, FAIL, ERROR, SKIP, TEST_RUN]
641         for result_type in result_types:
642             self[result_type] += len(result[result_type])
643
644     def add_result(self, result):
645         retval = 0
646         self.all_testcases += result.testcase_suite.countTestCases()
647         self.add_results(result)
648
649         if result.no_tests_run():
650             self.testsuites_no_tests_run.append(result.testcase_suite)
651             if result.crashed:
652                 retval = -1
653             else:
654                 retval = 1
655         elif not result.was_successful():
656             retval = 1
657
658         if retval != 0:
659             self.rerun.append(result.testcase_suite)
660
661         return retval
662
663     def print_results(self):
664         print('')
665         print(double_line_delim)
666         print('TEST RESULTS:')
667         print('     Scheduled tests: {}'.format(self.all_testcases))
668         print('      Executed tests: {}'.format(self[TEST_RUN]))
669         print('        Passed tests: {}'.format(
670             colorize(str(self[PASS]), GREEN)))
671         if self[SKIP] > 0:
672             print('       Skipped tests: {}'.format(
673                 colorize(str(self[SKIP]), YELLOW)))
674         if self.not_executed > 0:
675             print('  Not Executed tests: {}'.format(
676                 colorize(str(self.not_executed), RED)))
677         if self[FAIL] > 0:
678             print('            Failures: {}'.format(
679                 colorize(str(self[FAIL]), RED)))
680         if self[ERROR] > 0:
681             print('              Errors: {}'.format(
682                 colorize(str(self[ERROR]), RED)))
683
684         if self.all_failed > 0:
685             print('FAILURES AND ERRORS IN TESTS:')
686             for result in self.results_per_suite:
687                 failed_testcase_ids = result[FAIL]
688                 errored_testcase_ids = result[ERROR]
689                 old_testcase_name = None
690                 if failed_testcase_ids:
691                     for failed_test_id in failed_testcase_ids:
692                         new_testcase_name, test_name = \
693                             result.get_testcase_names(failed_test_id)
694                         if new_testcase_name != old_testcase_name:
695                             print('  Testcase name: {}'.format(
696                                 colorize(new_testcase_name, RED)))
697                             old_testcase_name = new_testcase_name
698                         print('    FAILURE: {} [{}]'.format(
699                             colorize(test_name, RED), failed_test_id))
700                 if errored_testcase_ids:
701                     for errored_test_id in errored_testcase_ids:
702                         new_testcase_name, test_name = \
703                             result.get_testcase_names(errored_test_id)
704                         if new_testcase_name != old_testcase_name:
705                             print('  Testcase name: {}'.format(
706                                 colorize(new_testcase_name, RED)))
707                             old_testcase_name = new_testcase_name
708                         print('      ERROR: {} [{}]'.format(
709                             colorize(test_name, RED), errored_test_id))
710         if self.testsuites_no_tests_run:
711             print('TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:')
712             tc_classes = set()
713             for testsuite in self.testsuites_no_tests_run:
714                 for testcase in testsuite:
715                     tc_classes.add(get_testcase_doc_name(testcase))
716             for tc_class in tc_classes:
717                 print('  {}'.format(colorize(tc_class, RED)))
718
719         print(double_line_delim)
720         print('')
721
722     @property
723     def not_executed(self):
724         return self.all_testcases - self[TEST_RUN]
725
726     @property
727     def all_failed(self):
728         return self[FAIL] + self[ERROR]
729
730
731 def parse_results(results):
732     """
733     Prints the number of scheduled, executed, not executed, passed, failed,
734     errored and skipped tests and details about failed and errored tests.
735
736     Also returns all suites where any test failed.
737
738     :param results:
739     :return:
740     """
741
742     results_per_suite = AllResults()
743     crashed = False
744     failed = False
745     for result in results:
746         result_code = results_per_suite.add_result(result)
747         if result_code == 1:
748             failed = True
749         elif result_code == -1:
750             crashed = True
751
752     results_per_suite.print_results()
753
754     if crashed:
755         return_code = -1
756     elif failed:
757         return_code = 1
758     else:
759         return_code = 0
760     return return_code, results_per_suite.rerun
761
762
763 def parse_digit_env(env_var, default):
764     value = os.getenv(env_var, default)
765     if value != default:
766         if value.isdigit():
767             value = int(value)
768         else:
769             print('WARNING: unsupported value "%s" for env var "%s",'
770                   'defaulting to %s' % (value, env_var, default))
771             value = default
772     return value
773
774
775 if __name__ == '__main__':
776
777     verbose = parse_digit_env("V", 0)
778
779     test_timeout = parse_digit_env("TIMEOUT", 600)  # default = 10 minutes
780
781     test_finished_join_timeout = 15
782
783     retries = parse_digit_env("RETRIES", 0)
784
785     debug = os.getenv("DEBUG", "n").lower() in ["gdb", "gdbserver"]
786
787     debug_core = os.getenv("DEBUG", "").lower() == "core"
788     compress_core = framework.BoolEnvironmentVariable("CORE_COMPRESS")
789
790     step = framework.BoolEnvironmentVariable("STEP")
791     force_foreground = framework.BoolEnvironmentVariable("FORCE_FOREGROUND")
792
793     run_interactive = debug or step or force_foreground
794
795     try:
796         num_cpus = len(os.sched_getaffinity(0))
797     except AttributeError:
798         num_cpus = multiprocessing.cpu_count()
799     shm_free = psutil.disk_usage('/dev/shm').free
800
801     print('OS reports %s available cpu(s). Free shm: %s' % (
802         num_cpus, "{:,}MB".format(shm_free / (1024 * 1024))))
803
804     test_jobs = os.getenv("TEST_JOBS", "1").lower()  # default = 1 process
805     if test_jobs == 'auto':
806         if run_interactive:
807             concurrent_tests = 1
808             print('Interactive mode required, running on one core')
809         else:
810             shm_max_processes = 1
811             if shm_free < min_req_shm:
812                 raise Exception('Not enough free space in /dev/shm. Required '
813                                 'free space is at least %sM.'
814                                 % (min_req_shm >> 20))
815             else:
816                 extra_shm = shm_free - min_req_shm
817                 shm_max_processes += extra_shm // shm_per_process
818             concurrent_tests = min(cpu_count(), shm_max_processes)
819             print('Found enough resources to run tests with %s cores'
820                   % concurrent_tests)
821     elif test_jobs.isdigit():
822         concurrent_tests = int(test_jobs)
823         print("Running on %s core(s) as set by 'TEST_JOBS'." %
824               concurrent_tests)
825     else:
826         concurrent_tests = 1
827         print('Running on one core.')
828
829     if run_interactive and concurrent_tests > 1:
830         raise NotImplementedError(
831             'Running tests interactively (DEBUG is gdb or gdbserver or STEP '
832             'is set) in parallel (TEST_JOBS is more than 1) is not supported')
833
834     parser = argparse.ArgumentParser(description="VPP unit tests")
835     parser.add_argument("-f", "--failfast", action='store_true',
836                         help="fast failure flag")
837     parser.add_argument("-d", "--dir", action='append', type=str,
838                         help="directory containing test files "
839                              "(may be specified multiple times)")
840     args = parser.parse_args()
841     failfast = args.failfast
842     descriptions = True
843
844     print("Running tests using custom test runner")  # debug message
845     filter_file, filter_class, filter_func = parse_test_option()
846
847     print("Active filters: file=%s, class=%s, function=%s" % (
848         filter_file, filter_class, filter_func))
849
850     filter_cb = FilterByTestOption(filter_file, filter_class, filter_func)
851
852     ignore_path = os.getenv("VENV_PATH", None)
853     cb = SplitToSuitesCallback(filter_cb)
854     for d in args.dir:
855         print("Adding tests from directory tree %s" % d)
856         discover_tests(d, cb, ignore_path)
857
858     # suites are not hashable, need to use list
859     suites = []
860     tests_amount = 0
861     for testcase_suite in cb.suites.values():
862         tests_amount += testcase_suite.countTestCases()
863         suites.append(testcase_suite)
864
865     print("%s out of %s tests match specified filters" % (
866         tests_amount, tests_amount + cb.filtered.countTestCases()))
867
868     if not running_extended_tests:
869         print("Not running extended tests (some tests will be skipped)")
870
871     attempts = retries + 1
872     if attempts > 1:
873         print("Perform %s attempts to pass the suite..." % attempts)
874
875     if run_interactive and suites:
876         # don't fork if requiring interactive terminal
877         print('Running tests in foreground in the current process')
878         full_suite = unittest.TestSuite()
879         full_suite.addTests(suites)
880         result = VppTestRunner(verbosity=verbose,
881                                failfast=failfast,
882                                print_summary=True).run(full_suite)
883         was_successful = result.wasSuccessful()
884         if not was_successful:
885             for test_case_info in result.failed_test_cases_info:
886                 handle_failed_suite(test_case_info.logger,
887                                     test_case_info.tempdir,
888                                     test_case_info.vpp_pid)
889                 if test_case_info in result.core_crash_test_cases_info:
890                     check_and_handle_core(test_case_info.vpp_bin_path,
891                                           test_case_info.tempdir,
892                                           test_case_info.core_crash_test)
893
894         sys.exit(not was_successful)
895     else:
896         print('Running each VPPTestCase in a separate background process'
897               ' with {} parallel process(es)'.format(concurrent_tests))
898         exit_code = 0
899         while suites and attempts > 0:
900             results = run_forked(suites)
901             exit_code, suites = parse_results(results)
902             attempts -= 1
903             if exit_code == 0:
904                 print('Test run was successful')
905             else:
906                 print('%s attempt(s) left.' % attempts)
907         sys.exit(exit_code)