tests: "force solo" testcase support
[vpp.git] / test / run_tests.py
1 #!/usr/bin/env python3
2
3 import sys
4 import shutil
5 import os
6 import fnmatch
7 import unittest
8 import argparse
9 import time
10 import threading
11 import signal
12 import psutil
13 import re
14 import multiprocessing
15 from multiprocessing import Process, Pipe, cpu_count
16 from multiprocessing.queues import Queue
17 from multiprocessing.managers import BaseManager
18 import framework
19 from framework import VppTestRunner, running_extended_tests, VppTestCase, \
20     get_testcase_doc_name, get_test_description, PASS, FAIL, ERROR, SKIP, \
21     TEST_RUN
22 from debug import spawn_gdb
23 from log import get_parallel_logger, double_line_delim, RED, YELLOW, GREEN, \
24     colorize, single_line_delim
25 from discover_tests import discover_tests
26 from subprocess import check_output, CalledProcessError
27 from util import check_core_path, get_core_path, is_core_present
28
29 # timeout which controls how long the child has to finish after seeing
30 # a core dump in test temporary directory. If this is exceeded, parent assumes
31 # that child process is stuck (e.g. waiting for shm mutex, which will never
32 # get unlocked) and kill the child
33 core_timeout = 3
34 min_req_shm = 536870912  # min 512MB shm required
35 # 128MB per extra process
36 shm_per_process = 134217728
37
38
39 class StreamQueue(Queue):
40     def write(self, msg):
41         self.put(msg)
42
43     def flush(self):
44         sys.__stdout__.flush()
45         sys.__stderr__.flush()
46
47     def fileno(self):
48         return self._writer.fileno()
49
50
51 class StreamQueueManager(BaseManager):
52     pass
53
54
55 StreamQueueManager.register('StreamQueue', StreamQueue)
56
57
58 class TestResult(dict):
59     def __init__(self, testcase_suite, testcases_by_id=None):
60         super(TestResult, self).__init__()
61         self[PASS] = []
62         self[FAIL] = []
63         self[ERROR] = []
64         self[SKIP] = []
65         self[TEST_RUN] = []
66         self.crashed = False
67         self.testcase_suite = testcase_suite
68         self.testcases = [testcase for testcase in testcase_suite]
69         self.testcases_by_id = testcases_by_id
70
71     def was_successful(self):
72         return 0 == len(self[FAIL]) == len(self[ERROR]) \
73             and len(self[PASS] + self[SKIP]) \
74             == self.testcase_suite.countTestCases() == len(self[TEST_RUN])
75
76     def no_tests_run(self):
77         return 0 == len(self[TEST_RUN])
78
79     def process_result(self, test_id, result):
80         self[result].append(test_id)
81
82     def suite_from_failed(self):
83         rerun_ids = set([])
84         for testcase in self.testcase_suite:
85             tc_id = testcase.id()
86             if tc_id not in self[PASS] and tc_id not in self[SKIP]:
87                 rerun_ids.add(tc_id)
88         if rerun_ids:
89             return suite_from_failed(self.testcase_suite, rerun_ids)
90
91     def get_testcase_names(self, test_id):
92         # could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
93         setup_teardown_match = re.match(
94             r'((tearDownClass)|(setUpClass)) \((.+\..+)\)', test_id)
95         if setup_teardown_match:
96             test_name, _, _, testcase_name = setup_teardown_match.groups()
97             if len(testcase_name.split('.')) == 2:
98                 for key in self.testcases_by_id.keys():
99                     if key.startswith(testcase_name):
100                         testcase_name = key
101                         break
102             testcase_name = self._get_testcase_doc_name(testcase_name)
103         else:
104             test_name = self._get_test_description(test_id)
105             testcase_name = self._get_testcase_doc_name(test_id)
106
107         return testcase_name, test_name
108
109     def _get_test_description(self, test_id):
110         if test_id in self.testcases_by_id:
111             desc = get_test_description(descriptions,
112                                         self.testcases_by_id[test_id])
113         else:
114             desc = test_id
115         return desc
116
117     def _get_testcase_doc_name(self, test_id):
118         if test_id in self.testcases_by_id:
119             doc_name = get_testcase_doc_name(self.testcases_by_id[test_id])
120         else:
121             doc_name = test_id
122         return doc_name
123
124
125 def test_runner_wrapper(suite, keep_alive_pipe, stdouterr_queue,
126                         finished_pipe, result_pipe, logger):
127     sys.stdout = stdouterr_queue
128     sys.stderr = stdouterr_queue
129     VppTestCase.parallel_handler = logger.handlers[0]
130     result = VppTestRunner(keep_alive_pipe=keep_alive_pipe,
131                            descriptions=descriptions,
132                            verbosity=verbose,
133                            result_pipe=result_pipe,
134                            failfast=failfast,
135                            print_summary=False).run(suite)
136     finished_pipe.send(result.wasSuccessful())
137     finished_pipe.close()
138     keep_alive_pipe.close()
139
140
141 class TestCaseWrapper(object):
142     def __init__(self, testcase_suite, manager):
143         self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(
144             duplex=False)
145         self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
146         self.result_parent_end, self.result_child_end = Pipe(duplex=False)
147         self.testcase_suite = testcase_suite
148         if sys.version[0] == '2':
149             self.stdouterr_queue = manager.StreamQueue()
150         else:
151             from multiprocessing import get_context
152             self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
153         self.logger = get_parallel_logger(self.stdouterr_queue)
154         self.child = Process(target=test_runner_wrapper,
155                              args=(testcase_suite,
156                                    self.keep_alive_child_end,
157                                    self.stdouterr_queue,
158                                    self.finished_child_end,
159                                    self.result_child_end,
160                                    self.logger)
161                              )
162         self.child.start()
163         self.last_test_temp_dir = None
164         self.last_test_vpp_binary = None
165         self._last_test = None
166         self.last_test_id = None
167         self.vpp_pid = None
168         self.last_heard = time.time()
169         self.core_detected_at = None
170         self.testcases_by_id = {}
171         self.testclasess_with_core = {}
172         for testcase in self.testcase_suite:
173             self.testcases_by_id[testcase.id()] = testcase
174         self.result = TestResult(testcase_suite, self.testcases_by_id)
175
176     @property
177     def last_test(self):
178         return self._last_test
179
180     @last_test.setter
181     def last_test(self, test_id):
182         self.last_test_id = test_id
183         if test_id in self.testcases_by_id:
184             testcase = self.testcases_by_id[test_id]
185             self._last_test = testcase.shortDescription()
186             if not self._last_test:
187                 self._last_test = str(testcase)
188         else:
189             self._last_test = test_id
190
191     def add_testclass_with_core(self):
192         if self.last_test_id in self.testcases_by_id:
193             test = self.testcases_by_id[self.last_test_id]
194             class_name = unittest.util.strclass(test.__class__)
195             test_name = "'{}' ({})".format(get_test_description(descriptions,
196                                                                 test),
197                                            self.last_test_id)
198         else:
199             test_name = self.last_test_id
200             class_name = re.match(r'((tearDownClass)|(setUpClass)) '
201                                   r'\((.+\..+)\)', test_name).groups()[3]
202         if class_name not in self.testclasess_with_core:
203             self.testclasess_with_core[class_name] = (
204                 test_name,
205                 self.last_test_vpp_binary,
206                 self.last_test_temp_dir)
207
208     def close_pipes(self):
209         self.keep_alive_child_end.close()
210         self.finished_child_end.close()
211         self.result_child_end.close()
212         self.keep_alive_parent_end.close()
213         self.finished_parent_end.close()
214         self.result_parent_end.close()
215
216     def was_successful(self):
217         return self.result.was_successful()
218
219
220 def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases,
221                              read_testcases):
222     read_testcase = None
223     while read_testcases.is_set() or unread_testcases:
224         if finished_unread_testcases:
225             read_testcase = finished_unread_testcases.pop()
226             unread_testcases.remove(read_testcase)
227         elif unread_testcases:
228             read_testcase = unread_testcases.pop()
229         if read_testcase:
230             data = ''
231             while data is not None:
232                 sys.stdout.write(data)
233                 data = read_testcase.stdouterr_queue.get()
234
235             read_testcase.stdouterr_queue.close()
236             finished_unread_testcases.discard(read_testcase)
237             read_testcase = None
238
239
240 def handle_failed_suite(logger, last_test_temp_dir, vpp_pid):
241     if last_test_temp_dir:
242         # Need to create link in case of a timeout or core dump without failure
243         lttd = os.path.basename(last_test_temp_dir)
244         failed_dir = os.getenv('FAILED_DIR')
245         link_path = '%s%s-FAILED' % (failed_dir, lttd)
246         if not os.path.exists(link_path):
247             os.symlink(last_test_temp_dir, link_path)
248         logger.error("Symlink to failed testcase directory: %s -> %s"
249                      % (link_path, lttd))
250
251         # Report core existence
252         core_path = get_core_path(last_test_temp_dir)
253         if os.path.exists(core_path):
254             logger.error(
255                 "Core-file exists in test temporary directory: %s!" %
256                 core_path)
257             check_core_path(logger, core_path)
258             logger.debug("Running 'file %s':" % core_path)
259             try:
260                 info = check_output(["file", core_path])
261                 logger.debug(info)
262             except CalledProcessError as e:
263                 logger.error("Subprocess returned with return code "
264                              "while running `file' utility on core-file "
265                              "returned: "
266                              "rc=%s", e.returncode)
267             except OSError as e:
268                 logger.error("Subprocess returned with OS error while "
269                              "running 'file' utility "
270                              "on core-file: "
271                              "(%s) %s", e.errno, e.strerror)
272             except Exception as e:
273                 logger.exception("Unexpected error running `file' utility "
274                                  "on core-file")
275
276     if vpp_pid:
277         # Copy api post mortem
278         api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
279         if os.path.isfile(api_post_mortem_path):
280             logger.error("Copying api_post_mortem.%d to %s" %
281                          (vpp_pid, last_test_temp_dir))
282             shutil.copy2(api_post_mortem_path, last_test_temp_dir)
283
284
285 def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
286     if is_core_present(tempdir):
287         if debug_core:
288             print('VPP core detected in %s. Last test running was %s' %
289                   (tempdir, core_crash_test))
290             print(single_line_delim)
291             spawn_gdb(vpp_binary, get_core_path(tempdir))
292             print(single_line_delim)
293         elif compress_core:
294             print("Compressing core-file in test directory `%s'" % tempdir)
295             os.system("gzip %s" % get_core_path(tempdir))
296
297
298 def handle_cores(failed_testcases):
299     for failed_testcase in failed_testcases:
300         tcs_with_core = failed_testcase.testclasess_with_core
301         if tcs_with_core:
302             for test, vpp_binary, tempdir in tcs_with_core.values():
303                 check_and_handle_core(vpp_binary, tempdir, test)
304
305
306 def process_finished_testsuite(wrapped_testcase_suite,
307                                finished_testcase_suites,
308                                failed_wrapped_testcases,
309                                results):
310     results.append(wrapped_testcase_suite.result)
311     finished_testcase_suites.add(wrapped_testcase_suite)
312     stop_run = False
313     if failfast and not wrapped_testcase_suite.was_successful():
314         stop_run = True
315
316     if not wrapped_testcase_suite.was_successful():
317         failed_wrapped_testcases.add(wrapped_testcase_suite)
318         handle_failed_suite(wrapped_testcase_suite.logger,
319                             wrapped_testcase_suite.last_test_temp_dir,
320                             wrapped_testcase_suite.vpp_pid)
321
322     return stop_run
323
324
325 def run_forked(testcase_suites):
326     wrapped_testcase_suites = set()
327     solo_testcase_suites = []
328     total_test_runners = 0
329
330     # suites are unhashable, need to use list
331     results = []
332     unread_testcases = set()
333     finished_unread_testcases = set()
334     manager = StreamQueueManager()
335     manager.start()
336     total_test_runners = 0
337     while total_test_runners < concurrent_tests:
338         if testcase_suites:
339             a_suite = testcase_suites.pop(0)
340             if a_suite.force_solo:
341                 solo_testcase_suites.append(a_suite)
342                 continue
343             wrapped_testcase_suite = TestCaseWrapper(a_suite,
344                                                      manager)
345             wrapped_testcase_suites.add(wrapped_testcase_suite)
346             unread_testcases.add(wrapped_testcase_suite)
347             total_test_runners = total_test_runners + 1
348         else:
349             break
350
351     while total_test_runners < 1 and solo_testcase_suites:
352         if solo_testcase_suites:
353             a_suite = solo_testcase_suites.pop(0)
354             wrapped_testcase_suite = TestCaseWrapper(a_suite,
355                                                      manager)
356             wrapped_testcase_suites.add(wrapped_testcase_suite)
357             unread_testcases.add(wrapped_testcase_suite)
358             total_test_runners = total_test_runners + 1
359         else:
360             break
361
362     read_from_testcases = threading.Event()
363     read_from_testcases.set()
364     stdouterr_thread = threading.Thread(target=stdouterr_reader_wrapper,
365                                         args=(unread_testcases,
366                                               finished_unread_testcases,
367                                               read_from_testcases))
368     stdouterr_thread.start()
369
370     failed_wrapped_testcases = set()
371     stop_run = False
372
373     try:
374         while wrapped_testcase_suites:
375             finished_testcase_suites = set()
376             for wrapped_testcase_suite in wrapped_testcase_suites:
377                 while wrapped_testcase_suite.result_parent_end.poll():
378                     wrapped_testcase_suite.result.process_result(
379                         *wrapped_testcase_suite.result_parent_end.recv())
380                     wrapped_testcase_suite.last_heard = time.time()
381
382                 while wrapped_testcase_suite.keep_alive_parent_end.poll():
383                     wrapped_testcase_suite.last_test, \
384                         wrapped_testcase_suite.last_test_vpp_binary, \
385                         wrapped_testcase_suite.last_test_temp_dir, \
386                         wrapped_testcase_suite.vpp_pid = \
387                         wrapped_testcase_suite.keep_alive_parent_end.recv()
388                     wrapped_testcase_suite.last_heard = time.time()
389
390                 if wrapped_testcase_suite.finished_parent_end.poll():
391                     wrapped_testcase_suite.finished_parent_end.recv()
392                     wrapped_testcase_suite.last_heard = time.time()
393                     stop_run = process_finished_testsuite(
394                         wrapped_testcase_suite,
395                         finished_testcase_suites,
396                         failed_wrapped_testcases,
397                         results) or stop_run
398                     continue
399
400                 fail = False
401                 if wrapped_testcase_suite.last_heard + test_timeout < \
402                         time.time():
403                     fail = True
404                     wrapped_testcase_suite.logger.critical(
405                         "Child test runner process timed out "
406                         "(last test running was `%s' in `%s')!" %
407                         (wrapped_testcase_suite.last_test,
408                          wrapped_testcase_suite.last_test_temp_dir))
409                 elif not wrapped_testcase_suite.child.is_alive():
410                     fail = True
411                     wrapped_testcase_suite.logger.critical(
412                         "Child test runner process unexpectedly died "
413                         "(last test running was `%s' in `%s')!" %
414                         (wrapped_testcase_suite.last_test,
415                          wrapped_testcase_suite.last_test_temp_dir))
416                 elif wrapped_testcase_suite.last_test_temp_dir and \
417                         wrapped_testcase_suite.last_test_vpp_binary:
418                     if is_core_present(
419                             wrapped_testcase_suite.last_test_temp_dir):
420                         wrapped_testcase_suite.add_testclass_with_core()
421                         if wrapped_testcase_suite.core_detected_at is None:
422                             wrapped_testcase_suite.core_detected_at = \
423                                 time.time()
424                         elif wrapped_testcase_suite.core_detected_at + \
425                                 core_timeout < time.time():
426                             wrapped_testcase_suite.logger.critical(
427                                 "Child test runner process unresponsive and "
428                                 "core-file exists in test temporary directory "
429                                 "(last test running was `%s' in `%s')!" %
430                                 (wrapped_testcase_suite.last_test,
431                                  wrapped_testcase_suite.last_test_temp_dir))
432                             fail = True
433
434                 if fail:
435                     wrapped_testcase_suite.child.terminate()
436                     try:
437                         # terminating the child process tends to leave orphan
438                         # VPP process around
439                         if wrapped_testcase_suite.vpp_pid:
440                             os.kill(wrapped_testcase_suite.vpp_pid,
441                                     signal.SIGTERM)
442                     except OSError:
443                         # already dead
444                         pass
445                     wrapped_testcase_suite.result.crashed = True
446                     wrapped_testcase_suite.result.process_result(
447                         wrapped_testcase_suite.last_test_id, ERROR)
448                     stop_run = process_finished_testsuite(
449                         wrapped_testcase_suite,
450                         finished_testcase_suites,
451                         failed_wrapped_testcases,
452                         results) or stop_run
453
454             for finished_testcase in finished_testcase_suites:
455                 # Somewhat surprisingly, the join below may
456                 # timeout, even if client signaled that
457                 # it finished - so we note it just in case.
458                 join_start = time.time()
459                 finished_testcase.child.join(test_finished_join_timeout)
460                 join_end = time.time()
461                 if join_end - join_start >= test_finished_join_timeout:
462                     finished_testcase.logger.error(
463                         "Timeout joining finished test: %s (pid %d)" %
464                         (finished_testcase.last_test,
465                          finished_testcase.child.pid))
466                 finished_testcase.close_pipes()
467                 wrapped_testcase_suites.remove(finished_testcase)
468                 finished_unread_testcases.add(finished_testcase)
469                 finished_testcase.stdouterr_queue.put(None)
470                 total_test_runners = total_test_runners - 1
471                 if stop_run:
472                     while testcase_suites:
473                         results.append(TestResult(testcase_suites.pop(0)))
474                 elif testcase_suites:
475                     a_testcase = testcase_suites.pop(0)
476                     while a_testcase and a_testcase.force_solo:
477                         solo_testcase_suites.append(a_testcase)
478                         if testcase_suites:
479                             a_testcase = testcase_suites.pop(0)
480                         else:
481                             a_testcase = None
482                     if a_testcase:
483                         new_testcase = TestCaseWrapper(a_testcase,
484                                                        manager)
485                         wrapped_testcase_suites.add(new_testcase)
486                         total_test_runners = total_test_runners + 1
487                         unread_testcases.add(new_testcase)
488                 else:
489                     if solo_testcase_suites and total_test_runners == 0:
490                         a_testcase = solo_testcase_suites.pop(0)
491                         new_testcase = TestCaseWrapper(a_testcase,
492                                                        manager)
493                         wrapped_testcase_suites.add(new_testcase)
494                         total_test_runners = total_test_runners + 1
495                         unread_testcases.add(new_testcase)
496             time.sleep(0.1)
497     except Exception:
498         for wrapped_testcase_suite in wrapped_testcase_suites:
499             wrapped_testcase_suite.child.terminate()
500             wrapped_testcase_suite.stdouterr_queue.put(None)
501         raise
502     finally:
503         read_from_testcases.clear()
504         stdouterr_thread.join(test_timeout)
505         manager.shutdown()
506
507     handle_cores(failed_wrapped_testcases)
508     return results
509
510
511 class SplitToSuitesCallback:
512     def __init__(self, filter_callback):
513         self.suites = {}
514         self.suite_name = 'default'
515         self.filter_callback = filter_callback
516         self.filtered = unittest.TestSuite()
517
518     def __call__(self, file_name, cls, method):
519         test_method = cls(method)
520         if self.filter_callback(file_name, cls.__name__, method):
521             self.suite_name = file_name + cls.__name__
522             if self.suite_name not in self.suites:
523                 self.suites[self.suite_name] = unittest.TestSuite()
524                 self.suites[self.suite_name].force_solo = False
525             self.suites[self.suite_name].addTest(test_method)
526             if test_method.force_solo():
527                 self.suites[self.suite_name].force_solo = True
528
529         else:
530             self.filtered.addTest(test_method)
531
532
533 test_option = "TEST"
534
535
536 def parse_test_option():
537     f = os.getenv(test_option, None)
538     filter_file_name = None
539     filter_class_name = None
540     filter_func_name = None
541     if f:
542         if '.' in f:
543             parts = f.split('.')
544             if len(parts) > 3:
545                 raise Exception("Unrecognized %s option: %s" %
546                                 (test_option, f))
547             if len(parts) > 2:
548                 if parts[2] not in ('*', ''):
549                     filter_func_name = parts[2]
550             if parts[1] not in ('*', ''):
551                 filter_class_name = parts[1]
552             if parts[0] not in ('*', ''):
553                 if parts[0].startswith('test_'):
554                     filter_file_name = parts[0]
555                 else:
556                     filter_file_name = 'test_%s' % parts[0]
557         else:
558             if f.startswith('test_'):
559                 filter_file_name = f
560             else:
561                 filter_file_name = 'test_%s' % f
562     if filter_file_name:
563         filter_file_name = '%s.py' % filter_file_name
564     return filter_file_name, filter_class_name, filter_func_name
565
566
567 def filter_tests(tests, filter_cb):
568     result = unittest.suite.TestSuite()
569     for t in tests:
570         if isinstance(t, unittest.suite.TestSuite):
571             # this is a bunch of tests, recursively filter...
572             x = filter_tests(t, filter_cb)
573             if x.countTestCases() > 0:
574                 result.addTest(x)
575         elif isinstance(t, unittest.TestCase):
576             # this is a single test
577             parts = t.id().split('.')
578             # t.id() for common cases like this:
579             # test_classifier.TestClassifier.test_acl_ip
580             # apply filtering only if it is so
581             if len(parts) == 3:
582                 if not filter_cb(parts[0], parts[1], parts[2]):
583                     continue
584             result.addTest(t)
585         else:
586             # unexpected object, don't touch it
587             result.addTest(t)
588     return result
589
590
591 class FilterByTestOption:
592     def __init__(self, filter_file_name, filter_class_name, filter_func_name):
593         self.filter_file_name = filter_file_name
594         self.filter_class_name = filter_class_name
595         self.filter_func_name = filter_func_name
596
597     def __call__(self, file_name, class_name, func_name):
598         if self.filter_file_name:
599             fn_match = fnmatch.fnmatch(file_name, self.filter_file_name)
600             if not fn_match:
601                 return False
602         if self.filter_class_name and class_name != self.filter_class_name:
603             return False
604         if self.filter_func_name and func_name != self.filter_func_name:
605             return False
606         return True
607
608
609 class FilterByClassList:
610     def __init__(self, classes_with_filenames):
611         self.classes_with_filenames = classes_with_filenames
612
613     def __call__(self, file_name, class_name, func_name):
614         return '.'.join([file_name, class_name]) in self.classes_with_filenames
615
616
617 def suite_from_failed(suite, failed):
618     failed = {x.rsplit('.', 1)[0] for x in failed}
619     filter_cb = FilterByClassList(failed)
620     suite = filter_tests(suite, filter_cb)
621     return suite
622
623
624 class AllResults(dict):
625     def __init__(self):
626         super(AllResults, self).__init__()
627         self.all_testcases = 0
628         self.results_per_suite = []
629         self[PASS] = 0
630         self[FAIL] = 0
631         self[ERROR] = 0
632         self[SKIP] = 0
633         self[TEST_RUN] = 0
634         self.rerun = []
635         self.testsuites_no_tests_run = []
636
637     def add_results(self, result):
638         self.results_per_suite.append(result)
639         result_types = [PASS, FAIL, ERROR, SKIP, TEST_RUN]
640         for result_type in result_types:
641             self[result_type] += len(result[result_type])
642
643     def add_result(self, result):
644         retval = 0
645         self.all_testcases += result.testcase_suite.countTestCases()
646         self.add_results(result)
647
648         if result.no_tests_run():
649             self.testsuites_no_tests_run.append(result.testcase_suite)
650             if result.crashed:
651                 retval = -1
652             else:
653                 retval = 1
654         elif not result.was_successful():
655             retval = 1
656
657         if retval != 0:
658             self.rerun.append(result.testcase_suite)
659
660         return retval
661
662     def print_results(self):
663         print('')
664         print(double_line_delim)
665         print('TEST RESULTS:')
666         print('     Scheduled tests: {}'.format(self.all_testcases))
667         print('      Executed tests: {}'.format(self[TEST_RUN]))
668         print('        Passed tests: {}'.format(
669             colorize(str(self[PASS]), GREEN)))
670         if self[SKIP] > 0:
671             print('       Skipped tests: {}'.format(
672                 colorize(str(self[SKIP]), YELLOW)))
673         if self.not_executed > 0:
674             print('  Not Executed tests: {}'.format(
675                 colorize(str(self.not_executed), RED)))
676         if self[FAIL] > 0:
677             print('            Failures: {}'.format(
678                 colorize(str(self[FAIL]), RED)))
679         if self[ERROR] > 0:
680             print('              Errors: {}'.format(
681                 colorize(str(self[ERROR]), RED)))
682
683         if self.all_failed > 0:
684             print('FAILURES AND ERRORS IN TESTS:')
685             for result in self.results_per_suite:
686                 failed_testcase_ids = result[FAIL]
687                 errored_testcase_ids = result[ERROR]
688                 old_testcase_name = None
689                 if failed_testcase_ids:
690                     for failed_test_id in failed_testcase_ids:
691                         new_testcase_name, test_name = \
692                             result.get_testcase_names(failed_test_id)
693                         if new_testcase_name != old_testcase_name:
694                             print('  Testcase name: {}'.format(
695                                 colorize(new_testcase_name, RED)))
696                             old_testcase_name = new_testcase_name
697                         print('    FAILURE: {} [{}]'.format(
698                             colorize(test_name, RED), failed_test_id))
699                 if errored_testcase_ids:
700                     for errored_test_id in errored_testcase_ids:
701                         new_testcase_name, test_name = \
702                             result.get_testcase_names(errored_test_id)
703                         if new_testcase_name != old_testcase_name:
704                             print('  Testcase name: {}'.format(
705                                 colorize(new_testcase_name, RED)))
706                             old_testcase_name = new_testcase_name
707                         print('      ERROR: {} [{}]'.format(
708                             colorize(test_name, RED), errored_test_id))
709         if self.testsuites_no_tests_run:
710             print('TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:')
711             tc_classes = set()
712             for testsuite in self.testsuites_no_tests_run:
713                 for testcase in testsuite:
714                     tc_classes.add(get_testcase_doc_name(testcase))
715             for tc_class in tc_classes:
716                 print('  {}'.format(colorize(tc_class, RED)))
717
718         print(double_line_delim)
719         print('')
720
721     @property
722     def not_executed(self):
723         return self.all_testcases - self[TEST_RUN]
724
725     @property
726     def all_failed(self):
727         return self[FAIL] + self[ERROR]
728
729
730 def parse_results(results):
731     """
732     Prints the number of scheduled, executed, not executed, passed, failed,
733     errored and skipped tests and details about failed and errored tests.
734
735     Also returns all suites where any test failed.
736
737     :param results:
738     :return:
739     """
740
741     results_per_suite = AllResults()
742     crashed = False
743     failed = False
744     for result in results:
745         result_code = results_per_suite.add_result(result)
746         if result_code == 1:
747             failed = True
748         elif result_code == -1:
749             crashed = True
750
751     results_per_suite.print_results()
752
753     if crashed:
754         return_code = -1
755     elif failed:
756         return_code = 1
757     else:
758         return_code = 0
759     return return_code, results_per_suite.rerun
760
761
762 def parse_digit_env(env_var, default):
763     value = os.getenv(env_var, default)
764     if value != default:
765         if value.isdigit():
766             value = int(value)
767         else:
768             print('WARNING: unsupported value "%s" for env var "%s",'
769                   'defaulting to %s' % (value, env_var, default))
770             value = default
771     return value
772
773
774 if __name__ == '__main__':
775
776     verbose = parse_digit_env("V", 0)
777
778     test_timeout = parse_digit_env("TIMEOUT", 600)  # default = 10 minutes
779
780     test_finished_join_timeout = 15
781
782     retries = parse_digit_env("RETRIES", 0)
783
784     debug = os.getenv("DEBUG", "n").lower() in ["gdb", "gdbserver"]
785
786     debug_core = os.getenv("DEBUG", "").lower() == "core"
787     compress_core = framework.BoolEnvironmentVariable("CORE_COMPRESS")
788
789     step = framework.BoolEnvironmentVariable("STEP")
790     force_foreground = framework.BoolEnvironmentVariable("FORCE_FOREGROUND")
791
792     run_interactive = debug or step or force_foreground
793
794     try:
795         num_cpus = len(os.sched_getaffinity(0))
796     except AttributeError:
797         num_cpus = multiprocessing.cpu_count()
798     shm_free = psutil.disk_usage('/dev/shm').free
799
800     print('OS reports %s available cpu(s). Free shm: %s' % (
801         num_cpus, "{:,}MB".format(shm_free / (1024 * 1024))))
802
803     test_jobs = os.getenv("TEST_JOBS", "1").lower()  # default = 1 process
804     if test_jobs == 'auto':
805         if run_interactive:
806             concurrent_tests = 1
807             print('Interactive mode required, running on one core')
808         else:
809             shm_max_processes = 1
810             if shm_free < min_req_shm:
811                 raise Exception('Not enough free space in /dev/shm. Required '
812                                 'free space is at least %sM.'
813                                 % (min_req_shm >> 20))
814             else:
815                 extra_shm = shm_free - min_req_shm
816                 shm_max_processes += extra_shm // shm_per_process
817             concurrent_tests = min(cpu_count(), shm_max_processes)
818             print('Found enough resources to run tests with %s cores'
819                   % concurrent_tests)
820     elif test_jobs.isdigit():
821         concurrent_tests = int(test_jobs)
822         print("Running on %s core(s) as set by 'TEST_JOBS'." %
823               concurrent_tests)
824     else:
825         concurrent_tests = 1
826         print('Running on one core.')
827
828     if run_interactive and concurrent_tests > 1:
829         raise NotImplementedError(
830             'Running tests interactively (DEBUG is gdb or gdbserver or STEP '
831             'is set) in parallel (TEST_JOBS is more than 1) is not supported')
832
833     parser = argparse.ArgumentParser(description="VPP unit tests")
834     parser.add_argument("-f", "--failfast", action='store_true',
835                         help="fast failure flag")
836     parser.add_argument("-d", "--dir", action='append', type=str,
837                         help="directory containing test files "
838                              "(may be specified multiple times)")
839     args = parser.parse_args()
840     failfast = args.failfast
841     descriptions = True
842
843     print("Running tests using custom test runner")  # debug message
844     filter_file, filter_class, filter_func = parse_test_option()
845
846     print("Active filters: file=%s, class=%s, function=%s" % (
847         filter_file, filter_class, filter_func))
848
849     filter_cb = FilterByTestOption(filter_file, filter_class, filter_func)
850
851     ignore_path = os.getenv("VENV_PATH", None)
852     cb = SplitToSuitesCallback(filter_cb)
853     for d in args.dir:
854         print("Adding tests from directory tree %s" % d)
855         discover_tests(d, cb, ignore_path)
856
857     # suites are not hashable, need to use list
858     suites = []
859     tests_amount = 0
860     for testcase_suite in cb.suites.values():
861         tests_amount += testcase_suite.countTestCases()
862         suites.append(testcase_suite)
863
864     print("%s out of %s tests match specified filters" % (
865         tests_amount, tests_amount + cb.filtered.countTestCases()))
866
867     if not running_extended_tests:
868         print("Not running extended tests (some tests will be skipped)")
869
870     attempts = retries + 1
871     if attempts > 1:
872         print("Perform %s attempts to pass the suite..." % attempts)
873
874     if run_interactive and suites:
875         # don't fork if requiring interactive terminal
876         print('Running tests in foreground in the current process')
877         full_suite = unittest.TestSuite()
878         full_suite.addTests(suites)
879         result = VppTestRunner(verbosity=verbose,
880                                failfast=failfast,
881                                print_summary=True).run(full_suite)
882         was_successful = result.wasSuccessful()
883         if not was_successful:
884             for test_case_info in result.failed_test_cases_info:
885                 handle_failed_suite(test_case_info.logger,
886                                     test_case_info.tempdir,
887                                     test_case_info.vpp_pid)
888                 if test_case_info in result.core_crash_test_cases_info:
889                     check_and_handle_core(test_case_info.vpp_bin_path,
890                                           test_case_info.tempdir,
891                                           test_case_info.core_crash_test)
892
893         sys.exit(not was_successful)
894     else:
895         print('Running each VPPTestCase in a separate background process'
896               ' with {} parallel process(es)'.format(concurrent_tests))
897         exit_code = 0
898         while suites and attempts > 0:
899             results = run_forked(suites)
900             exit_code, suites = parse_results(results)
901             attempts -= 1
902             if exit_code == 0:
903                 print('Test run was successful')
904             else:
905                 print('%s attempt(s) left.' % attempts)
906         sys.exit(exit_code)