else:
return 0
+ @classmethod
+ def force_solo(cls):
+ """ if the test case class is timing-sensitive - return true """
+ return False
+
@classmethod
def instance(cls):
"""Return the instance of this testcase"""
"""
def print_header(test):
+ test_doc = getdoc(test)
+ if not test_doc:
+ raise Exception("No doc string for test '%s'" % test.id())
+ test_title = test_doc.splitlines()[0]
+ test_title_colored = colorize(test_title, GREEN)
+ if test.force_solo():
+ # long live PEP-8 and 80 char width limitation...
+ c = YELLOW
+ test_title_colored = colorize("SOLO RUN: " + test_title, c)
+
if not hasattr(test.__class__, '_header_printed'):
print(double_line_delim)
- print(colorize(getdoc(test).splitlines()[0], GREEN))
+ print(test_title_colored)
print(double_line_delim)
test.__class__._header_printed = True
def run_forked(testcase_suites):
wrapped_testcase_suites = set()
+ solo_testcase_suites = []
+ total_test_runners = 0
# suites are unhashable, need to use list
results = []
finished_unread_testcases = set()
manager = StreamQueueManager()
manager.start()
- for i in range(concurrent_tests):
+ total_test_runners = 0
+ while total_test_runners < concurrent_tests:
if testcase_suites:
- wrapped_testcase_suite = TestCaseWrapper(testcase_suites.pop(0),
+ a_suite = testcase_suites.pop(0)
+ if a_suite.force_solo:
+ solo_testcase_suites.append(a_suite)
+ continue
+ wrapped_testcase_suite = TestCaseWrapper(a_suite,
manager)
wrapped_testcase_suites.add(wrapped_testcase_suite)
unread_testcases.add(wrapped_testcase_suite)
+ total_test_runners = total_test_runners + 1
+ else:
+ break
+
+ while total_test_runners < 1 and solo_testcase_suites:
+ if solo_testcase_suites:
+ a_suite = solo_testcase_suites.pop(0)
+ wrapped_testcase_suite = TestCaseWrapper(a_suite,
+ manager)
+ wrapped_testcase_suites.add(wrapped_testcase_suite)
+ unread_testcases.add(wrapped_testcase_suite)
+ total_test_runners = total_test_runners + 1
else:
break
wrapped_testcase_suites.remove(finished_testcase)
finished_unread_testcases.add(finished_testcase)
finished_testcase.stdouterr_queue.put(None)
+ total_test_runners = total_test_runners - 1
if stop_run:
while testcase_suites:
results.append(TestResult(testcase_suites.pop(0)))
elif testcase_suites:
- new_testcase = TestCaseWrapper(testcase_suites.pop(0),
- manager)
- wrapped_testcase_suites.add(new_testcase)
- unread_testcases.add(new_testcase)
+ a_testcase = testcase_suites.pop(0)
+ while a_testcase and a_testcase.force_solo:
+ solo_testcase_suites.append(a_testcase)
+ if testcase_suites:
+ a_testcase = testcase_suites.pop(0)
+ else:
+ a_testcase = None
+ if a_testcase:
+ new_testcase = TestCaseWrapper(a_testcase,
+ manager)
+ wrapped_testcase_suites.add(new_testcase)
+ total_test_runners = total_test_runners + 1
+ unread_testcases.add(new_testcase)
+ else:
+ if solo_testcase_suites and total_test_runners == 0:
+ a_testcase = solo_testcase_suites.pop(0)
+ new_testcase = TestCaseWrapper(a_testcase,
+ manager)
+ wrapped_testcase_suites.add(new_testcase)
+ total_test_runners = total_test_runners + 1
+ unread_testcases.add(new_testcase)
time.sleep(0.1)
except Exception:
for wrapped_testcase_suite in wrapped_testcase_suites:
self.suite_name = file_name + cls.__name__
if self.suite_name not in self.suites:
self.suites[self.suite_name] = unittest.TestSuite()
+ self.suites[self.suite_name].force_solo = False
self.suites[self.suite_name].addTest(test_method)
+ if test_method.force_solo():
+ self.suites[self.suite_name].force_solo = True
else:
self.filtered.addTest(test_method)