tests: do not skip solo-run tests if running within a single job and all remaining...
[vpp.git] / test / run_tests.py
1 #!/usr/bin/env python3
2
3 import sys
4 import shutil
5 import os
6 import fnmatch
7 import unittest
8 import argparse
9 import time
10 import threading
11 import signal
12 import psutil
13 import re
14 import multiprocessing
15 from multiprocessing import Process, Pipe, cpu_count
16 from multiprocessing.queues import Queue
17 from multiprocessing.managers import BaseManager
18 import framework
19 from framework import VppTestRunner, running_extended_tests, VppTestCase, \
20     get_testcase_doc_name, get_test_description, PASS, FAIL, ERROR, SKIP, \
21     TEST_RUN
22 from debug import spawn_gdb
23 from log import get_parallel_logger, double_line_delim, RED, YELLOW, GREEN, \
24     colorize, single_line_delim
25 from discover_tests import discover_tests
26 from subprocess import check_output, CalledProcessError
27 from util import check_core_path, get_core_path, is_core_present
28
29 # timeout which controls how long the child has to finish after seeing
30 # a core dump in test temporary directory. If this is exceeded, parent assumes
31 # that child process is stuck (e.g. waiting for shm mutex, which will never
32 # get unlocked) and kill the child
33 core_timeout = 3
34 min_req_shm = 536870912  # min 512MB shm required
35 # 128MB per extra process
36 shm_per_process = 134217728
37
38
39 class StreamQueue(Queue):
40     def write(self, msg):
41         self.put(msg)
42
43     def flush(self):
44         sys.__stdout__.flush()
45         sys.__stderr__.flush()
46
47     def fileno(self):
48         return self._writer.fileno()
49
50
51 class StreamQueueManager(BaseManager):
52     pass
53
54
55 StreamQueueManager.register('StreamQueue', StreamQueue)
56
57
58 class TestResult(dict):
59     def __init__(self, testcase_suite, testcases_by_id=None):
60         super(TestResult, self).__init__()
61         self[PASS] = []
62         self[FAIL] = []
63         self[ERROR] = []
64         self[SKIP] = []
65         self[TEST_RUN] = []
66         self.crashed = False
67         self.testcase_suite = testcase_suite
68         self.testcases = [testcase for testcase in testcase_suite]
69         self.testcases_by_id = testcases_by_id
70
71     def was_successful(self):
72         return 0 == len(self[FAIL]) == len(self[ERROR]) \
73             and len(self[PASS] + self[SKIP]) \
74             == self.testcase_suite.countTestCases() == len(self[TEST_RUN])
75
76     def no_tests_run(self):
77         return 0 == len(self[TEST_RUN])
78
79     def process_result(self, test_id, result):
80         self[result].append(test_id)
81
82     def suite_from_failed(self):
83         rerun_ids = set([])
84         for testcase in self.testcase_suite:
85             tc_id = testcase.id()
86             if tc_id not in self[PASS] and tc_id not in self[SKIP]:
87                 rerun_ids.add(tc_id)
88         if rerun_ids:
89             return suite_from_failed(self.testcase_suite, rerun_ids)
90
91     def get_testcase_names(self, test_id):
92         # could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
93         setup_teardown_match = re.match(
94             r'((tearDownClass)|(setUpClass)) \((.+\..+)\)', test_id)
95         if setup_teardown_match:
96             test_name, _, _, testcase_name = setup_teardown_match.groups()
97             if len(testcase_name.split('.')) == 2:
98                 for key in self.testcases_by_id.keys():
99                     if key.startswith(testcase_name):
100                         testcase_name = key
101                         break
102             testcase_name = self._get_testcase_doc_name(testcase_name)
103         else:
104             test_name = self._get_test_description(test_id)
105             testcase_name = self._get_testcase_doc_name(test_id)
106
107         return testcase_name, test_name
108
109     def _get_test_description(self, test_id):
110         if test_id in self.testcases_by_id:
111             desc = get_test_description(descriptions,
112                                         self.testcases_by_id[test_id])
113         else:
114             desc = test_id
115         return desc
116
117     def _get_testcase_doc_name(self, test_id):
118         if test_id in self.testcases_by_id:
119             doc_name = get_testcase_doc_name(self.testcases_by_id[test_id])
120         else:
121             doc_name = test_id
122         return doc_name
123
124
125 def test_runner_wrapper(suite, keep_alive_pipe, stdouterr_queue,
126                         finished_pipe, result_pipe, logger):
127     sys.stdout = stdouterr_queue
128     sys.stderr = stdouterr_queue
129     VppTestCase.parallel_handler = logger.handlers[0]
130     result = VppTestRunner(keep_alive_pipe=keep_alive_pipe,
131                            descriptions=descriptions,
132                            verbosity=verbose,
133                            result_pipe=result_pipe,
134                            failfast=failfast,
135                            print_summary=False).run(suite)
136     finished_pipe.send(result.wasSuccessful())
137     finished_pipe.close()
138     keep_alive_pipe.close()
139
140
141 class TestCaseWrapper(object):
142     def __init__(self, testcase_suite, manager):
143         self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(
144             duplex=False)
145         self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
146         self.result_parent_end, self.result_child_end = Pipe(duplex=False)
147         self.testcase_suite = testcase_suite
148         if sys.version[0] == '2':
149             self.stdouterr_queue = manager.StreamQueue()
150         else:
151             from multiprocessing import get_context
152             self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
153         self.logger = get_parallel_logger(self.stdouterr_queue)
154         self.child = Process(target=test_runner_wrapper,
155                              args=(testcase_suite,
156                                    self.keep_alive_child_end,
157                                    self.stdouterr_queue,
158                                    self.finished_child_end,
159                                    self.result_child_end,
160                                    self.logger)
161                              )
162         self.child.start()
163         self.last_test_temp_dir = None
164         self.last_test_vpp_binary = None
165         self._last_test = None
166         self.last_test_id = None
167         self.vpp_pid = None
168         self.last_heard = time.time()
169         self.core_detected_at = None
170         self.testcases_by_id = {}
171         self.testclasess_with_core = {}
172         for testcase in self.testcase_suite:
173             self.testcases_by_id[testcase.id()] = testcase
174         self.result = TestResult(testcase_suite, self.testcases_by_id)
175
176     @property
177     def last_test(self):
178         return self._last_test
179
180     @last_test.setter
181     def last_test(self, test_id):
182         self.last_test_id = test_id
183         if test_id in self.testcases_by_id:
184             testcase = self.testcases_by_id[test_id]
185             self._last_test = testcase.shortDescription()
186             if not self._last_test:
187                 self._last_test = str(testcase)
188         else:
189             self._last_test = test_id
190
191     def add_testclass_with_core(self):
192         if self.last_test_id in self.testcases_by_id:
193             test = self.testcases_by_id[self.last_test_id]
194             class_name = unittest.util.strclass(test.__class__)
195             test_name = "'{}' ({})".format(get_test_description(descriptions,
196                                                                 test),
197                                            self.last_test_id)
198         else:
199             test_name = self.last_test_id
200             class_name = re.match(r'((tearDownClass)|(setUpClass)) '
201                                   r'\((.+\..+)\)', test_name).groups()[3]
202         if class_name not in self.testclasess_with_core:
203             self.testclasess_with_core[class_name] = (
204                 test_name,
205                 self.last_test_vpp_binary,
206                 self.last_test_temp_dir)
207
208     def close_pipes(self):
209         self.keep_alive_child_end.close()
210         self.finished_child_end.close()
211         self.result_child_end.close()
212         self.keep_alive_parent_end.close()
213         self.finished_parent_end.close()
214         self.result_parent_end.close()
215
216     def was_successful(self):
217         return self.result.was_successful()
218
219
220 def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases,
221                              read_testcases):
222     read_testcase = None
223     while read_testcases.is_set() or unread_testcases:
224         if finished_unread_testcases:
225             read_testcase = finished_unread_testcases.pop()
226             unread_testcases.remove(read_testcase)
227         elif unread_testcases:
228             read_testcase = unread_testcases.pop()
229         if read_testcase:
230             data = ''
231             while data is not None:
232                 sys.stdout.write(data)
233                 data = read_testcase.stdouterr_queue.get()
234
235             read_testcase.stdouterr_queue.close()
236             finished_unread_testcases.discard(read_testcase)
237             read_testcase = None
238
239
240 def handle_failed_suite(logger, last_test_temp_dir, vpp_pid):
241     if last_test_temp_dir:
242         # Need to create link in case of a timeout or core dump without failure
243         lttd = os.path.basename(last_test_temp_dir)
244         failed_dir = os.getenv('FAILED_DIR')
245         link_path = '%s%s-FAILED' % (failed_dir, lttd)
246         if not os.path.exists(link_path):
247             os.symlink(last_test_temp_dir, link_path)
248         logger.error("Symlink to failed testcase directory: %s -> %s"
249                      % (link_path, lttd))
250
251         # Report core existence
252         core_path = get_core_path(last_test_temp_dir)
253         if os.path.exists(core_path):
254             logger.error(
255                 "Core-file exists in test temporary directory: %s!" %
256                 core_path)
257             check_core_path(logger, core_path)
258             logger.debug("Running 'file %s':" % core_path)
259             try:
260                 info = check_output(["file", core_path])
261                 logger.debug(info)
262             except CalledProcessError as e:
263                 logger.error("Subprocess returned with return code "
264                              "while running `file' utility on core-file "
265                              "returned: "
266                              "rc=%s", e.returncode)
267             except OSError as e:
268                 logger.error("Subprocess returned with OS error while "
269                              "running 'file' utility "
270                              "on core-file: "
271                              "(%s) %s", e.errno, e.strerror)
272             except Exception as e:
273                 logger.exception("Unexpected error running `file' utility "
274                                  "on core-file")
275
276     if vpp_pid:
277         # Copy api post mortem
278         api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
279         if os.path.isfile(api_post_mortem_path):
280             logger.error("Copying api_post_mortem.%d to %s" %
281                          (vpp_pid, last_test_temp_dir))
282             shutil.copy2(api_post_mortem_path, last_test_temp_dir)
283
284
285 def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
286     if is_core_present(tempdir):
287         if debug_core:
288             print('VPP core detected in %s. Last test running was %s' %
289                   (tempdir, core_crash_test))
290             print(single_line_delim)
291             spawn_gdb(vpp_binary, get_core_path(tempdir))
292             print(single_line_delim)
293         elif compress_core:
294             print("Compressing core-file in test directory `%s'" % tempdir)
295             os.system("gzip %s" % get_core_path(tempdir))
296
297
298 def handle_cores(failed_testcases):
299     for failed_testcase in failed_testcases:
300         tcs_with_core = failed_testcase.testclasess_with_core
301         if tcs_with_core:
302             for test, vpp_binary, tempdir in tcs_with_core.values():
303                 check_and_handle_core(vpp_binary, tempdir, test)
304
305
306 def process_finished_testsuite(wrapped_testcase_suite,
307                                finished_testcase_suites,
308                                failed_wrapped_testcases,
309                                results):
310     results.append(wrapped_testcase_suite.result)
311     finished_testcase_suites.add(wrapped_testcase_suite)
312     stop_run = False
313     if failfast and not wrapped_testcase_suite.was_successful():
314         stop_run = True
315
316     if not wrapped_testcase_suite.was_successful():
317         failed_wrapped_testcases.add(wrapped_testcase_suite)
318         handle_failed_suite(wrapped_testcase_suite.logger,
319                             wrapped_testcase_suite.last_test_temp_dir,
320                             wrapped_testcase_suite.vpp_pid)
321
322     return stop_run
323
324
325 def run_forked(testcase_suites):
326     wrapped_testcase_suites = set()
327     solo_testcase_suites = []
328     total_test_runners = 0
329
330     # suites are unhashable, need to use list
331     results = []
332     unread_testcases = set()
333     finished_unread_testcases = set()
334     manager = StreamQueueManager()
335     manager.start()
336     total_test_runners = 0
337     while total_test_runners < concurrent_tests:
338         if testcase_suites:
339             a_suite = testcase_suites.pop(0)
340             if a_suite.force_solo:
341                 solo_testcase_suites.append(a_suite)
342                 continue
343             wrapped_testcase_suite = TestCaseWrapper(a_suite,
344                                                      manager)
345             wrapped_testcase_suites.add(wrapped_testcase_suite)
346             unread_testcases.add(wrapped_testcase_suite)
347             total_test_runners = total_test_runners + 1
348         else:
349             break
350
351     while total_test_runners < 1 and solo_testcase_suites:
352         if solo_testcase_suites:
353             a_suite = solo_testcase_suites.pop(0)
354             wrapped_testcase_suite = TestCaseWrapper(a_suite,
355                                                      manager)
356             wrapped_testcase_suites.add(wrapped_testcase_suite)
357             unread_testcases.add(wrapped_testcase_suite)
358             total_test_runners = total_test_runners + 1
359         else:
360             break
361
362     read_from_testcases = threading.Event()
363     read_from_testcases.set()
364     stdouterr_thread = threading.Thread(target=stdouterr_reader_wrapper,
365                                         args=(unread_testcases,
366                                               finished_unread_testcases,
367                                               read_from_testcases))
368     stdouterr_thread.start()
369
370     failed_wrapped_testcases = set()
371     stop_run = False
372
373     try:
374         while wrapped_testcase_suites:
375             finished_testcase_suites = set()
376             for wrapped_testcase_suite in wrapped_testcase_suites:
377                 while wrapped_testcase_suite.result_parent_end.poll():
378                     wrapped_testcase_suite.result.process_result(
379                         *wrapped_testcase_suite.result_parent_end.recv())
380                     wrapped_testcase_suite.last_heard = time.time()
381
382                 while wrapped_testcase_suite.keep_alive_parent_end.poll():
383                     wrapped_testcase_suite.last_test, \
384                         wrapped_testcase_suite.last_test_vpp_binary, \
385                         wrapped_testcase_suite.last_test_temp_dir, \
386                         wrapped_testcase_suite.vpp_pid = \
387                         wrapped_testcase_suite.keep_alive_parent_end.recv()
388                     wrapped_testcase_suite.last_heard = time.time()
389
390                 if wrapped_testcase_suite.finished_parent_end.poll():
391                     wrapped_testcase_suite.finished_parent_end.recv()
392                     wrapped_testcase_suite.last_heard = time.time()
393                     stop_run = process_finished_testsuite(
394                         wrapped_testcase_suite,
395                         finished_testcase_suites,
396                         failed_wrapped_testcases,
397                         results) or stop_run
398                     continue
399
400                 fail = False
401                 if wrapped_testcase_suite.last_heard + test_timeout < \
402                         time.time():
403                     fail = True
404                     wrapped_testcase_suite.logger.critical(
405                         "Child test runner process timed out "
406                         "(last test running was `%s' in `%s')!" %
407                         (wrapped_testcase_suite.last_test,
408                          wrapped_testcase_suite.last_test_temp_dir))
409                 elif not wrapped_testcase_suite.child.is_alive():
410                     fail = True
411                     wrapped_testcase_suite.logger.critical(
412                         "Child test runner process unexpectedly died "
413                         "(last test running was `%s' in `%s')!" %
414                         (wrapped_testcase_suite.last_test,
415                          wrapped_testcase_suite.last_test_temp_dir))
416                 elif wrapped_testcase_suite.last_test_temp_dir and \
417                         wrapped_testcase_suite.last_test_vpp_binary:
418                     if is_core_present(
419                             wrapped_testcase_suite.last_test_temp_dir):
420                         wrapped_testcase_suite.add_testclass_with_core()
421                         if wrapped_testcase_suite.core_detected_at is None:
422                             wrapped_testcase_suite.core_detected_at = \
423                                 time.time()
424                         elif wrapped_testcase_suite.core_detected_at + \
425                                 core_timeout < time.time():
426                             wrapped_testcase_suite.logger.critical(
427                                 "Child test runner process unresponsive and "
428                                 "core-file exists in test temporary directory "
429                                 "(last test running was `%s' in `%s')!" %
430                                 (wrapped_testcase_suite.last_test,
431                                  wrapped_testcase_suite.last_test_temp_dir))
432                             fail = True
433
434                 if fail:
435                     wrapped_testcase_suite.child.terminate()
436                     try:
437                         # terminating the child process tends to leave orphan
438                         # VPP process around
439                         if wrapped_testcase_suite.vpp_pid:
440                             os.kill(wrapped_testcase_suite.vpp_pid,
441                                     signal.SIGTERM)
442                     except OSError:
443                         # already dead
444                         pass
445                     wrapped_testcase_suite.result.crashed = True
446                     wrapped_testcase_suite.result.process_result(
447                         wrapped_testcase_suite.last_test_id, ERROR)
448                     stop_run = process_finished_testsuite(
449                         wrapped_testcase_suite,
450                         finished_testcase_suites,
451                         failed_wrapped_testcases,
452                         results) or stop_run
453
454             for finished_testcase in finished_testcase_suites:
455                 # Somewhat surprisingly, the join below may
456                 # timeout, even if client signaled that
457                 # it finished - so we note it just in case.
458                 join_start = time.time()
459                 finished_testcase.child.join(test_finished_join_timeout)
460                 join_end = time.time()
461                 if join_end - join_start >= test_finished_join_timeout:
462                     finished_testcase.logger.error(
463                         "Timeout joining finished test: %s (pid %d)" %
464                         (finished_testcase.last_test,
465                          finished_testcase.child.pid))
466                 finished_testcase.close_pipes()
467                 wrapped_testcase_suites.remove(finished_testcase)
468                 finished_unread_testcases.add(finished_testcase)
469                 finished_testcase.stdouterr_queue.put(None)
470                 total_test_runners = total_test_runners - 1
471                 if stop_run:
472                     while testcase_suites:
473                         results.append(TestResult(testcase_suites.pop(0)))
474                 elif testcase_suites:
475                     a_testcase = testcase_suites.pop(0)
476                     while a_testcase and a_testcase.force_solo:
477                         solo_testcase_suites.append(a_testcase)
478                         if testcase_suites:
479                             a_testcase = testcase_suites.pop(0)
480                         else:
481                             a_testcase = None
482                     if a_testcase:
483                         new_testcase = TestCaseWrapper(a_testcase,
484                                                        manager)
485                         wrapped_testcase_suites.add(new_testcase)
486                         total_test_runners = total_test_runners + 1
487                         unread_testcases.add(new_testcase)
488                 if solo_testcase_suites and total_test_runners == 0:
489                     a_testcase = solo_testcase_suites.pop(0)
490                     new_testcase = TestCaseWrapper(a_testcase,
491                                                    manager)
492                     wrapped_testcase_suites.add(new_testcase)
493                     total_test_runners = total_test_runners + 1
494                     unread_testcases.add(new_testcase)
495             time.sleep(0.1)
496     except Exception:
497         for wrapped_testcase_suite in wrapped_testcase_suites:
498             wrapped_testcase_suite.child.terminate()
499             wrapped_testcase_suite.stdouterr_queue.put(None)
500         raise
501     finally:
502         read_from_testcases.clear()
503         stdouterr_thread.join(test_timeout)
504         manager.shutdown()
505
506     handle_cores(failed_wrapped_testcases)
507     return results
508
509
510 class SplitToSuitesCallback:
511     def __init__(self, filter_callback):
512         self.suites = {}
513         self.suite_name = 'default'
514         self.filter_callback = filter_callback
515         self.filtered = unittest.TestSuite()
516
517     def __call__(self, file_name, cls, method):
518         test_method = cls(method)
519         if self.filter_callback(file_name, cls.__name__, method):
520             self.suite_name = file_name + cls.__name__
521             if self.suite_name not in self.suites:
522                 self.suites[self.suite_name] = unittest.TestSuite()
523                 self.suites[self.suite_name].force_solo = False
524             self.suites[self.suite_name].addTest(test_method)
525             if test_method.force_solo():
526                 self.suites[self.suite_name].force_solo = True
527
528         else:
529             self.filtered.addTest(test_method)
530
531
532 test_option = "TEST"
533
534
535 def parse_test_option():
536     f = os.getenv(test_option, None)
537     filter_file_name = None
538     filter_class_name = None
539     filter_func_name = None
540     if f:
541         if '.' in f:
542             parts = f.split('.')
543             if len(parts) > 3:
544                 raise Exception("Unrecognized %s option: %s" %
545                                 (test_option, f))
546             if len(parts) > 2:
547                 if parts[2] not in ('*', ''):
548                     filter_func_name = parts[2]
549             if parts[1] not in ('*', ''):
550                 filter_class_name = parts[1]
551             if parts[0] not in ('*', ''):
552                 if parts[0].startswith('test_'):
553                     filter_file_name = parts[0]
554                 else:
555                     filter_file_name = 'test_%s' % parts[0]
556         else:
557             if f.startswith('test_'):
558                 filter_file_name = f
559             else:
560                 filter_file_name = 'test_%s' % f
561     if filter_file_name:
562         filter_file_name = '%s.py' % filter_file_name
563     return filter_file_name, filter_class_name, filter_func_name
564
565
566 def filter_tests(tests, filter_cb):
567     result = unittest.suite.TestSuite()
568     for t in tests:
569         if isinstance(t, unittest.suite.TestSuite):
570             # this is a bunch of tests, recursively filter...
571             x = filter_tests(t, filter_cb)
572             if x.countTestCases() > 0:
573                 result.addTest(x)
574         elif isinstance(t, unittest.TestCase):
575             # this is a single test
576             parts = t.id().split('.')
577             # t.id() for common cases like this:
578             # test_classifier.TestClassifier.test_acl_ip
579             # apply filtering only if it is so
580             if len(parts) == 3:
581                 if not filter_cb(parts[0], parts[1], parts[2]):
582                     continue
583             result.addTest(t)
584         else:
585             # unexpected object, don't touch it
586             result.addTest(t)
587     return result
588
589
590 class FilterByTestOption:
591     def __init__(self, filter_file_name, filter_class_name, filter_func_name):
592         self.filter_file_name = filter_file_name
593         self.filter_class_name = filter_class_name
594         self.filter_func_name = filter_func_name
595
596     def __call__(self, file_name, class_name, func_name):
597         if self.filter_file_name:
598             fn_match = fnmatch.fnmatch(file_name, self.filter_file_name)
599             if not fn_match:
600                 return False
601         if self.filter_class_name and class_name != self.filter_class_name:
602             return False
603         if self.filter_func_name and func_name != self.filter_func_name:
604             return False
605         return True
606
607
608 class FilterByClassList:
609     def __init__(self, classes_with_filenames):
610         self.classes_with_filenames = classes_with_filenames
611
612     def __call__(self, file_name, class_name, func_name):
613         return '.'.join([file_name, class_name]) in self.classes_with_filenames
614
615
616 def suite_from_failed(suite, failed):
617     failed = {x.rsplit('.', 1)[0] for x in failed}
618     filter_cb = FilterByClassList(failed)
619     suite = filter_tests(suite, filter_cb)
620     return suite
621
622
623 class AllResults(dict):
624     def __init__(self):
625         super(AllResults, self).__init__()
626         self.all_testcases = 0
627         self.results_per_suite = []
628         self[PASS] = 0
629         self[FAIL] = 0
630         self[ERROR] = 0
631         self[SKIP] = 0
632         self[TEST_RUN] = 0
633         self.rerun = []
634         self.testsuites_no_tests_run = []
635
636     def add_results(self, result):
637         self.results_per_suite.append(result)
638         result_types = [PASS, FAIL, ERROR, SKIP, TEST_RUN]
639         for result_type in result_types:
640             self[result_type] += len(result[result_type])
641
642     def add_result(self, result):
643         retval = 0
644         self.all_testcases += result.testcase_suite.countTestCases()
645         self.add_results(result)
646
647         if result.no_tests_run():
648             self.testsuites_no_tests_run.append(result.testcase_suite)
649             if result.crashed:
650                 retval = -1
651             else:
652                 retval = 1
653         elif not result.was_successful():
654             retval = 1
655
656         if retval != 0:
657             self.rerun.append(result.testcase_suite)
658
659         return retval
660
661     def print_results(self):
662         print('')
663         print(double_line_delim)
664         print('TEST RESULTS:')
665         print('     Scheduled tests: {}'.format(self.all_testcases))
666         print('      Executed tests: {}'.format(self[TEST_RUN]))
667         print('        Passed tests: {}'.format(
668             colorize(str(self[PASS]), GREEN)))
669         if self[SKIP] > 0:
670             print('       Skipped tests: {}'.format(
671                 colorize(str(self[SKIP]), YELLOW)))
672         if self.not_executed > 0:
673             print('  Not Executed tests: {}'.format(
674                 colorize(str(self.not_executed), RED)))
675         if self[FAIL] > 0:
676             print('            Failures: {}'.format(
677                 colorize(str(self[FAIL]), RED)))
678         if self[ERROR] > 0:
679             print('              Errors: {}'.format(
680                 colorize(str(self[ERROR]), RED)))
681
682         if self.all_failed > 0:
683             print('FAILURES AND ERRORS IN TESTS:')
684             for result in self.results_per_suite:
685                 failed_testcase_ids = result[FAIL]
686                 errored_testcase_ids = result[ERROR]
687                 old_testcase_name = None
688                 if failed_testcase_ids:
689                     for failed_test_id in failed_testcase_ids:
690                         new_testcase_name, test_name = \
691                             result.get_testcase_names(failed_test_id)
692                         if new_testcase_name != old_testcase_name:
693                             print('  Testcase name: {}'.format(
694                                 colorize(new_testcase_name, RED)))
695                             old_testcase_name = new_testcase_name
696                         print('    FAILURE: {} [{}]'.format(
697                             colorize(test_name, RED), failed_test_id))
698                 if errored_testcase_ids:
699                     for errored_test_id in errored_testcase_ids:
700                         new_testcase_name, test_name = \
701                             result.get_testcase_names(errored_test_id)
702                         if new_testcase_name != old_testcase_name:
703                             print('  Testcase name: {}'.format(
704                                 colorize(new_testcase_name, RED)))
705                             old_testcase_name = new_testcase_name
706                         print('      ERROR: {} [{}]'.format(
707                             colorize(test_name, RED), errored_test_id))
708         if self.testsuites_no_tests_run:
709             print('TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:')
710             tc_classes = set()
711             for testsuite in self.testsuites_no_tests_run:
712                 for testcase in testsuite:
713                     tc_classes.add(get_testcase_doc_name(testcase))
714             for tc_class in tc_classes:
715                 print('  {}'.format(colorize(tc_class, RED)))
716
717         print(double_line_delim)
718         print('')
719
720     @property
721     def not_executed(self):
722         return self.all_testcases - self[TEST_RUN]
723
724     @property
725     def all_failed(self):
726         return self[FAIL] + self[ERROR]
727
728
729 def parse_results(results):
730     """
731     Prints the number of scheduled, executed, not executed, passed, failed,
732     errored and skipped tests and details about failed and errored tests.
733
734     Also returns all suites where any test failed.
735
736     :param results:
737     :return:
738     """
739
740     results_per_suite = AllResults()
741     crashed = False
742     failed = False
743     for result in results:
744         result_code = results_per_suite.add_result(result)
745         if result_code == 1:
746             failed = True
747         elif result_code == -1:
748             crashed = True
749
750     results_per_suite.print_results()
751
752     if crashed:
753         return_code = -1
754     elif failed:
755         return_code = 1
756     else:
757         return_code = 0
758     return return_code, results_per_suite.rerun
759
760
761 def parse_digit_env(env_var, default):
762     value = os.getenv(env_var, default)
763     if value != default:
764         if value.isdigit():
765             value = int(value)
766         else:
767             print('WARNING: unsupported value "%s" for env var "%s",'
768                   'defaulting to %s' % (value, env_var, default))
769             value = default
770     return value
771
772
773 if __name__ == '__main__':
774
775     verbose = parse_digit_env("V", 0)
776
777     test_timeout = parse_digit_env("TIMEOUT", 600)  # default = 10 minutes
778
779     test_finished_join_timeout = 15
780
781     retries = parse_digit_env("RETRIES", 0)
782
783     debug = os.getenv("DEBUG", "n").lower() in ["gdb", "gdbserver"]
784
785     debug_core = os.getenv("DEBUG", "").lower() == "core"
786     compress_core = framework.BoolEnvironmentVariable("CORE_COMPRESS")
787
788     step = framework.BoolEnvironmentVariable("STEP")
789     force_foreground = framework.BoolEnvironmentVariable("FORCE_FOREGROUND")
790
791     run_interactive = debug or step or force_foreground
792
793     try:
794         num_cpus = len(os.sched_getaffinity(0))
795     except AttributeError:
796         num_cpus = multiprocessing.cpu_count()
797     shm_free = psutil.disk_usage('/dev/shm').free
798
799     print('OS reports %s available cpu(s). Free shm: %s' % (
800         num_cpus, "{:,}MB".format(shm_free / (1024 * 1024))))
801
802     test_jobs = os.getenv("TEST_JOBS", "1").lower()  # default = 1 process
803     if test_jobs == 'auto':
804         if run_interactive:
805             concurrent_tests = 1
806             print('Interactive mode required, running on one core')
807         else:
808             shm_max_processes = 1
809             if shm_free < min_req_shm:
810                 raise Exception('Not enough free space in /dev/shm. Required '
811                                 'free space is at least %sM.'
812                                 % (min_req_shm >> 20))
813             else:
814                 extra_shm = shm_free - min_req_shm
815                 shm_max_processes += extra_shm // shm_per_process
816             concurrent_tests = min(cpu_count(), shm_max_processes)
817             print('Found enough resources to run tests with %s cores'
818                   % concurrent_tests)
819     elif test_jobs.isdigit():
820         concurrent_tests = int(test_jobs)
821         print("Running on %s core(s) as set by 'TEST_JOBS'." %
822               concurrent_tests)
823     else:
824         concurrent_tests = 1
825         print('Running on one core.')
826
827     if run_interactive and concurrent_tests > 1:
828         raise NotImplementedError(
829             'Running tests interactively (DEBUG is gdb or gdbserver or STEP '
830             'is set) in parallel (TEST_JOBS is more than 1) is not supported')
831
832     parser = argparse.ArgumentParser(description="VPP unit tests")
833     parser.add_argument("-f", "--failfast", action='store_true',
834                         help="fast failure flag")
835     parser.add_argument("-d", "--dir", action='append', type=str,
836                         help="directory containing test files "
837                              "(may be specified multiple times)")
838     args = parser.parse_args()
839     failfast = args.failfast
840     descriptions = True
841
842     print("Running tests using custom test runner")  # debug message
843     filter_file, filter_class, filter_func = parse_test_option()
844
845     print("Active filters: file=%s, class=%s, function=%s" % (
846         filter_file, filter_class, filter_func))
847
848     filter_cb = FilterByTestOption(filter_file, filter_class, filter_func)
849
850     ignore_path = os.getenv("VENV_PATH", None)
851     cb = SplitToSuitesCallback(filter_cb)
852     for d in args.dir:
853         print("Adding tests from directory tree %s" % d)
854         discover_tests(d, cb, ignore_path)
855
856     # suites are not hashable, need to use list
857     suites = []
858     tests_amount = 0
859     for testcase_suite in cb.suites.values():
860         tests_amount += testcase_suite.countTestCases()
861         suites.append(testcase_suite)
862
863     print("%s out of %s tests match specified filters" % (
864         tests_amount, tests_amount + cb.filtered.countTestCases()))
865
866     if not running_extended_tests:
867         print("Not running extended tests (some tests will be skipped)")
868
869     attempts = retries + 1
870     if attempts > 1:
871         print("Perform %s attempts to pass the suite..." % attempts)
872
873     if run_interactive and suites:
874         # don't fork if requiring interactive terminal
875         print('Running tests in foreground in the current process')
876         full_suite = unittest.TestSuite()
877         full_suite.addTests(suites)
878         result = VppTestRunner(verbosity=verbose,
879                                failfast=failfast,
880                                print_summary=True).run(full_suite)
881         was_successful = result.wasSuccessful()
882         if not was_successful:
883             for test_case_info in result.failed_test_cases_info:
884                 handle_failed_suite(test_case_info.logger,
885                                     test_case_info.tempdir,
886                                     test_case_info.vpp_pid)
887                 if test_case_info in result.core_crash_test_cases_info:
888                     check_and_handle_core(test_case_info.vpp_bin_path,
889                                           test_case_info.tempdir,
890                                           test_case_info.core_crash_test)
891
892         sys.exit(not was_successful)
893     else:
894         print('Running each VPPTestCase in a separate background process'
895               ' with {} parallel process(es)'.format(concurrent_tests))
896         exit_code = 0
897         while suites and attempts > 0:
898             results = run_forked(suites)
899             exit_code, suites = parse_results(results)
900             attempts -= 1
901             if exit_code == 0:
902                 print('Test run was successful')
903             else:
904                 print('%s attempt(s) left.' % attempts)
905         sys.exit(exit_code)