Commit 4d299831 authored by Victor Stinner's avatar Victor Stinner Committed by GitHub

bpo-36725: regrtest: add TestResult type (GH-12960)

* Add TestResult and MultiprocessResult types to ensure that results
  always have the same fields.
* runtest() now handles KeyboardInterrupt
* accumulate_result() and format_test_result() now takes a TestResult
* cleanup_test_droppings() is now called by runtest() and mark the
  test as ENV_CHANGED if the test leaks support.TESTFN file.
* runtest() now includes code "around" the test in the test timing
* Add print_warning() in test.libregrtest.utils to standardize how
  libregrtest logs warnings to ease parsing the test output.
* support.unload() is now called with abstest rather than test_name
* Rename 'test' variable/parameter to 'test_name'
* dash_R(): remove unused the_module parameter
* Remove unused imports
parent 9db03247
......@@ -105,26 +105,30 @@ class Regrtest:
# used by --junit-xml
self.testsuite_xml = None
def accumulate_result(self, test, result):
ok, test_time, xml_data = result
def accumulate_result(self, result):
test_name = result.test_name
ok = result.result
if ok not in (CHILD_ERROR, INTERRUPTED):
self.test_times.append((test_time, test))
self.test_times.append((result.test_time, test_name))
if ok == PASSED:
self.good.append(test)
self.good.append(test_name)
elif ok in (FAILED, CHILD_ERROR):
self.bad.append(test)
self.bad.append(test_name)
elif ok == ENV_CHANGED:
self.environment_changed.append(test)
self.environment_changed.append(test_name)
elif ok == SKIPPED:
self.skipped.append(test)
self.skipped.append(test_name)
elif ok == RESOURCE_DENIED:
self.skipped.append(test)
self.resource_denieds.append(test)
self.skipped.append(test_name)
self.resource_denieds.append(test_name)
elif ok == TEST_DID_NOT_RUN:
self.run_no_tests.append(test)
self.run_no_tests.append(test_name)
elif ok != INTERRUPTED:
raise ValueError("invalid test result: %r" % ok)
xml_data = result.xml_data
if xml_data:
import xml.etree.ElementTree as ET
for e in xml_data:
......@@ -134,7 +138,7 @@ class Regrtest:
print(xml_data, file=sys.__stderr__)
raise
def display_progress(self, test_index, test):
def display_progress(self, test_index, text):
if self.ns.quiet:
return
......@@ -143,7 +147,7 @@ class Regrtest:
fails = len(self.bad) + len(self.environment_changed)
if fails and not self.ns.pgo:
line = f"{line}/{fails}"
line = f"[{line}] {test}"
line = f"[{line}] {text}"
# add the system load prefix: "load avg: 1.80 "
if self.getloadavg:
......@@ -275,13 +279,13 @@ class Regrtest:
support.verbose = False
support.set_match_tests(self.ns.match_tests)
for test in self.selected:
abstest = get_abs_module(self.ns, test)
for test_name in self.selected:
abstest = get_abs_module(self.ns, test_name)
try:
suite = unittest.defaultTestLoader.loadTestsFromName(abstest)
self._list_cases(suite)
except unittest.SkipTest:
self.skipped.append(test)
self.skipped.append(test_name)
if self.skipped:
print(file=sys.stderr)
......@@ -298,19 +302,19 @@ class Regrtest:
print()
print("Re-running failed tests in verbose mode")
self.rerun = self.bad[:]
for test in self.rerun:
print("Re-running test %r in verbose mode" % test, flush=True)
try:
for test_name in self.rerun:
print("Re-running test %r in verbose mode" % test_name, flush=True)
self.ns.verbose = True
ok = runtest(self.ns, test)
except KeyboardInterrupt:
self.interrupted = True
ok = runtest(self.ns, test_name)
if ok[0] in {PASSED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED}:
self.bad.remove(test_name)
if ok.result == INTERRUPTED:
# print a newline separate from the ^C
print()
self.interrupted = True
break
else:
if ok[0] in {PASSED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED}:
self.bad.remove(test)
else:
if self.bad:
print(count(len(self.bad), 'test'), "failed again:")
......@@ -348,8 +352,8 @@ class Regrtest:
self.test_times.sort(reverse=True)
print()
print("10 slowest tests:")
for time, test in self.test_times[:10]:
print("- %s: %s" % (test, format_duration(time)))
for test_time, test in self.test_times[:10]:
print("- %s: %s" % (test, format_duration(test_time)))
if self.bad:
print()
......@@ -387,10 +391,10 @@ class Regrtest:
print("Run tests sequentially")
previous_test = None
for test_index, test in enumerate(self.tests, 1):
for test_index, test_name in enumerate(self.tests, 1):
start_time = time.monotonic()
text = test
text = test_name
if previous_test:
text = '%s -- %s' % (text, previous_test)
self.display_progress(test_index, text)
......@@ -398,22 +402,20 @@ class Regrtest:
if self.tracer:
# If we're tracing code coverage, then we don't exit with status
# if on a false return value from main.
cmd = ('result = runtest(self.ns, test); '
'self.accumulate_result(test, result)')
cmd = ('result = runtest(self.ns, test_name); '
'self.accumulate_result(result)')
ns = dict(locals())
self.tracer.runctx(cmd, globals=globals(), locals=ns)
result = ns['result']
else:
try:
result = runtest(self.ns, test)
except KeyboardInterrupt:
result = runtest(self.ns, test_name)
self.accumulate_result(result)
if result.result == INTERRUPTED:
self.interrupted = True
self.accumulate_result(test, (INTERRUPTED, None, None))
break
else:
self.accumulate_result(test, result)
previous_test = format_test_result(test, result[0])
previous_test = format_test_result(result)
test_time = time.monotonic() - start_time
if test_time >= PROGRESS_MIN_TIME:
previous_test = "%s in %s" % (previous_test, format_duration(test_time))
......@@ -441,8 +443,8 @@ class Regrtest:
def _test_forever(self, tests):
while True:
for test in tests:
yield test
for test_name in tests:
yield test_name
if self.bad:
return
if self.ns.fail_env_changed and self.environment_changed:
......
import errno
import os
import re
import sys
......@@ -18,7 +17,7 @@ except ImportError:
cls._abc_negative_cache, cls._abc_negative_cache_version)
def dash_R(ns, the_module, test_name, test_func):
def dash_R(ns, test_name, test_func):
"""Run a test multiple times, looking for reference leaks.
Returns:
......
import collections
import faulthandler
import functools
import importlib
import io
import os
......@@ -9,6 +11,7 @@ import unittest
from test import support
from test.libregrtest.refleak import dash_R, clear_caches
from test.libregrtest.save_env import saved_test_environment
from test.libregrtest.utils import print_warning
# Test result constants.
......@@ -55,9 +58,17 @@ STDTESTS = [
NOTTESTS = set()
def format_test_result(test_name, result):
fmt = _FORMAT_TEST_RESULT.get(result, "%s")
return fmt % test_name
# used by --findleaks, store for gc.garbage
found_garbage = []
def format_test_result(result):
fmt = _FORMAT_TEST_RESULT.get(result.result, "%s")
return fmt % result.test_name
def findtestdir(path=None):
return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
def findtests(testdir=None, stdtests=STDTESTS, nottests=NOTTESTS):
......@@ -73,48 +84,34 @@ def findtests(testdir=None, stdtests=STDTESTS, nottests=NOTTESTS):
return stdtests + sorted(tests)
def get_abs_module(ns, test):
if test.startswith('test.') or ns.testdir:
return test
def get_abs_module(ns, test_name):
if test_name.startswith('test.') or ns.testdir:
return test_name
else:
# Always import it from the test package
return 'test.' + test
# Import it from the test package
return 'test.' + test_name
def runtest(ns, test):
"""Run a single test.
ns -- regrtest namespace of options
test -- the name of the test
Returns the tuple (result, test_time, xml_data), where result is one
of the constants:
INTERRUPTED KeyboardInterrupt when run under -j
RESOURCE_DENIED test skipped because resource denied
SKIPPED test skipped for some other reason
ENV_CHANGED test failed because it changed the execution environment
FAILED test failed
PASSED test passed
EMPTY_TEST_SUITE test ran no subtests.
TestResult = collections.namedtuple('TestResult',
'test_name result test_time xml_data')
If ns.xmlpath is not None, xml_data is a list containing each
generated testsuite element.
"""
def _runtest(ns, test_name):
# Handle faulthandler timeout, capture stdout+stderr, XML serialization
# and measure time.
output_on_failure = ns.verbose3
use_timeout = (ns.timeout is not None)
if use_timeout:
faulthandler.dump_traceback_later(ns.timeout, exit=True)
start_time = time.perf_counter()
try:
support.set_match_tests(ns.match_tests)
# reset the environment_altered flag to detect if a test altered
# the environment
support.environment_altered = False
support.junit_xml_list = xml_list = [] if ns.xmlpath else None
if ns.failfast:
support.failfast = True
if output_on_failure:
support.verbose = True
......@@ -124,8 +121,9 @@ def runtest(ns, test):
try:
sys.stdout = stream
sys.stderr = stream
result = runtest_inner(ns, test, display_failure=False)
if result[0] != PASSED:
result = _runtest_inner(ns, test_name,
display_failure=False)
if result != PASSED:
output = stream.getvalue()
orig_stderr.write(output)
orig_stderr.flush()
......@@ -133,42 +131,63 @@ def runtest(ns, test):
sys.stdout = orig_stdout
sys.stderr = orig_stderr
else:
support.verbose = ns.verbose # Tell tests to be moderately quiet
result = runtest_inner(ns, test, display_failure=not ns.verbose)
# Tell tests to be moderately quiet
support.verbose = ns.verbose
result = _runtest_inner(ns, test_name,
display_failure=not ns.verbose)
if xml_list:
import xml.etree.ElementTree as ET
xml_data = [ET.tostring(x).decode('us-ascii') for x in xml_list]
else:
xml_data = None
return result + (xml_data,)
test_time = time.perf_counter() - start_time
return TestResult(test_name, result, test_time, xml_data)
finally:
if use_timeout:
faulthandler.cancel_dump_traceback_later()
cleanup_test_droppings(test, ns.verbose)
support.junit_xml_list = None
def post_test_cleanup():
support.reap_children()
def runtest(ns, test_name):
"""Run a single test.
ns -- regrtest namespace of options
test_name -- the name of the test
Returns the tuple (result, test_time, xml_data), where result is one
of the constants:
def runtest_inner(ns, test, display_failure=True):
support.unload(test)
INTERRUPTED KeyboardInterrupt
RESOURCE_DENIED test skipped because resource denied
SKIPPED test skipped for some other reason
ENV_CHANGED test failed because it changed the execution environment
FAILED test failed
PASSED test passed
EMPTY_TEST_SUITE test ran no subtests.
test_time = 0.0
refleak = False # True if the test leaked references.
If ns.xmlpath is not None, xml_data is a list containing each
generated testsuite element.
"""
try:
abstest = get_abs_module(ns, test)
clear_caches()
with saved_test_environment(test, ns.verbose, ns.quiet, pgo=ns.pgo) as environment:
start_time = time.perf_counter()
the_module = importlib.import_module(abstest)
# If the test has a test_main, that will run the appropriate
# tests. If not, use normal unittest test loading.
test_runner = getattr(the_module, "test_main", None)
if test_runner is None:
def test_runner():
return _runtest(ns, test_name)
except:
if not ns.pgo:
msg = traceback.format_exc()
print(f"test {test_name} crashed -- {msg}",
file=sys.stderr, flush=True)
return TestResult(test_name, FAILED, 0.0, None)
def post_test_cleanup():
support.gc_collect()
support.reap_children()
def _test_module(the_module):
loader = unittest.TestLoader()
tests = loader.loadTestsFromModule(the_module)
for error in loader.errors:
......@@ -176,55 +195,106 @@ def runtest_inner(ns, test, display_failure=True):
if loader.errors:
raise Exception("errors while loading tests")
support.run_unittest(tests)
def _runtest_inner2(ns, test_name):
# Load the test function, run the test function, handle huntrleaks
# and findleaks to detect leaks
abstest = get_abs_module(ns, test_name)
# remove the module from sys.module to reload it if it was already imported
support.unload(abstest)
the_module = importlib.import_module(abstest)
# If the test has a test_main, that will run the appropriate
# tests. If not, use normal unittest test loading.
test_runner = getattr(the_module, "test_main", None)
if test_runner is None:
test_runner = functools.partial(_test_module, the_module)
try:
if ns.huntrleaks:
refleak = dash_R(ns, the_module, test, test_runner)
# Return True if the test leaked references
refleak = dash_R(ns, test_name, test_runner)
else:
test_runner()
test_time = time.perf_counter() - start_time
refleak = False
finally:
cleanup_test_droppings(test_name, ns.verbose)
if ns.findleaks:
import gc
support.gc_collect()
if gc.garbage:
import gc
gc.garbage = [1]
print_warning(f"{test_name} created {len(gc.garbage)} "
f"uncollectable object(s).")
# move the uncollectable objects somewhere,
# so we don't see them again
found_garbage.extend(gc.garbage)
gc.garbage.clear()
support.environment_altered = True
post_test_cleanup()
return refleak
def _runtest_inner(ns, test_name, display_failure=True):
# Detect environment changes, handle exceptions.
# Reset the environment_altered flag to detect if a test altered
# the environment
support.environment_altered = False
if ns.pgo:
display_failure = False
try:
clear_caches()
with saved_test_environment(test_name, ns.verbose, ns.quiet, pgo=ns.pgo) as environment:
refleak = _runtest_inner2(ns, test_name)
except support.ResourceDenied as msg:
if not ns.quiet and not ns.pgo:
print(test, "skipped --", msg, flush=True)
return RESOURCE_DENIED, test_time
print(f"{test_name} skipped -- {msg}", flush=True)
return RESOURCE_DENIED
except unittest.SkipTest as msg:
if not ns.quiet and not ns.pgo:
print(test, "skipped --", msg, flush=True)
return SKIPPED, test_time
except KeyboardInterrupt:
raise
except support.TestFailed as msg:
if not ns.pgo:
print(f"{test_name} skipped -- {msg}", flush=True)
return SKIPPED
except support.TestFailed as exc:
msg = f"test {test_name} failed"
if display_failure:
print("test", test, "failed --", msg, file=sys.stderr,
flush=True)
else:
print("test", test, "failed", file=sys.stderr, flush=True)
return FAILED, test_time
msg = f"{msg} -- {exc}"
print(msg, file=sys.stderr, flush=True)
return FAILED
except support.TestDidNotRun:
return TEST_DID_NOT_RUN, test_time
return TEST_DID_NOT_RUN
except KeyboardInterrupt:
return INTERRUPTED
except:
msg = traceback.format_exc()
if not ns.pgo:
print("test", test, "crashed --", msg, file=sys.stderr,
flush=True)
return FAILED, test_time
else:
msg = traceback.format_exc()
print(f"test {test_name} crashed -- {msg}",
file=sys.stderr, flush=True)
return FAILED
if refleak:
return FAILED, test_time
return FAILED
if environment.changed:
return ENV_CHANGED, test_time
return PASSED, test_time
return ENV_CHANGED
return PASSED
def cleanup_test_droppings(testname, verbose):
import shutil
import stat
import gc
def cleanup_test_droppings(test_name, verbose):
# First kill any dangling references to open files etc.
# This can also issue some ResourceWarnings which would otherwise get
# triggered during the following test run, and possibly produce failures.
gc.collect()
support.gc_collect()
# Try to clean up junk commonly left behind. While tests shouldn't leave
# any files or directories behind, when a test fails that can be tedious
......@@ -239,23 +309,23 @@ def cleanup_test_droppings(testname, verbose):
continue
if os.path.isdir(name):
import shutil
kind, nuker = "directory", shutil.rmtree
elif os.path.isfile(name):
kind, nuker = "file", os.unlink
else:
raise SystemError("os.path says %r exists but is neither "
"directory nor file" % name)
raise RuntimeError(f"os.path says {name!r} exists but is neither "
f"directory nor file")
if verbose:
print("%r left behind %s %r" % (testname, kind, name))
print_warning("%r left behind %s %r" % (test_name, kind, name))
support.environment_altered = True
try:
import stat
# fix possible permissions problems that might prevent cleanup
os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
nuker(name)
except Exception as msg:
print(("%r left behind %s %r and it couldn't be "
"removed: %s" % (testname, kind, name, msg)), file=sys.stderr)
def findtestdir(path=None):
return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
except Exception as exc:
print_warning(f"{test_name} left behind {kind} {name!r} "
f"and it couldn't be removed: {exc}")
import collections
import faulthandler
import json
import os
......@@ -5,13 +6,12 @@ import queue
import sys
import threading
import time
import traceback
import types
from test import support
from test.libregrtest.runtest import (
runtest, INTERRUPTED, CHILD_ERROR, PROGRESS_MIN_TIME,
format_test_result)
format_test_result, TestResult)
from test.libregrtest.setup import setup_tests
from test.libregrtest.utils import format_duration
......@@ -64,15 +64,9 @@ def run_tests_worker(worker_args):
setup_tests(ns)
try:
result = runtest(ns, testname)
except KeyboardInterrupt:
result = INTERRUPTED, '', None
except BaseException as e:
traceback.print_exc()
result = CHILD_ERROR, str(e)
print() # Force a newline (just in case)
print(json.dumps(result), flush=True)
sys.exit(0)
......@@ -97,45 +91,51 @@ class MultiprocessIterator:
return next(self.tests)
MultiprocessResult = collections.namedtuple('MultiprocessResult',
'result stdout stderr error_msg')
class MultiprocessThread(threading.Thread):
def __init__(self, pending, output, ns):
super().__init__()
self.pending = pending
self.output = output
self.ns = ns
self.current_test = None
self.current_test_name = None
self.start_time = None
def _runtest(self):
try:
test = next(self.pending)
test_name = next(self.pending)
except StopIteration:
self.output.put((None, None, None, None))
self.output.put(None)
return True
try:
self.start_time = time.monotonic()
self.current_test = test
self.current_test_name = test_name
retcode, stdout, stderr = run_test_in_subprocess(test, self.ns)
retcode, stdout, stderr = run_test_in_subprocess(test_name, self.ns)
finally:
self.current_test = None
self.current_test_name = None
if retcode != 0:
result = (CHILD_ERROR, "Exit code %s" % retcode, None)
self.output.put((test, stdout.rstrip(), stderr.rstrip(),
result))
test_time = time.monotonic() - self.start_time
result = TestResult(test_name, CHILD_ERROR, test_time, None)
err_msg = "Exit code %s" % retcode
mp_result = MultiprocessResult(result, stdout.rstrip(), stderr.rstrip(), err_msg)
self.output.put(mp_result)
return False
stdout, _, result = stdout.strip().rpartition("\n")
if not result:
self.output.put((None, None, None, None))
self.output.put(None)
return True
# deserialize run_tests_worker() output
result = json.loads(result)
assert len(result) == 3, f"Invalid result tuple: {result!r}"
self.output.put((test, stdout.rstrip(), stderr.rstrip(),
result))
result = TestResult(*result)
mp_result = MultiprocessResult(result, stdout.rstrip(), stderr.rstrip(), None)
self.output.put(mp_result)
return False
def run(self):
......@@ -144,7 +144,7 @@ class MultiprocessThread(threading.Thread):
while not stop:
stop = self._runtest()
except BaseException:
self.output.put((None, None, None, None))
self.output.put(None)
raise
......@@ -164,12 +164,12 @@ def run_tests_multiprocess(regrtest):
def get_running(workers):
running = []
for worker in workers:
current_test = worker.current_test
if not current_test:
current_test_name = worker.current_test_name
if not current_test_name:
continue
dt = time.monotonic() - worker.start_time
if dt >= PROGRESS_MIN_TIME:
text = '%s (%s)' % (current_test, format_duration(dt))
text = '%s (%s)' % (current_test_name, format_duration(dt))
running.append(text)
return running
......@@ -182,40 +182,41 @@ def run_tests_multiprocess(regrtest):
faulthandler.dump_traceback_later(test_timeout, exit=True)
try:
item = output.get(timeout=get_timeout)
mp_result = output.get(timeout=get_timeout)
except queue.Empty:
running = get_running(workers)
if running and not regrtest.ns.pgo:
print('running: %s' % ', '.join(running), flush=True)
continue
test, stdout, stderr, result = item
if test is None:
if mp_result is None:
finished += 1
continue
regrtest.accumulate_result(test, result)
result = mp_result.result
regrtest.accumulate_result(result)
# Display progress
ok, test_time, xml_data = result
text = format_test_result(test, ok)
ok = result.result
text = format_test_result(result)
if (ok not in (CHILD_ERROR, INTERRUPTED)
and test_time >= PROGRESS_MIN_TIME
and result.test_time >= PROGRESS_MIN_TIME
and not regrtest.ns.pgo):
text += ' (%s)' % format_duration(test_time)
text += ' (%s)' % format_duration(result.test_time)
elif ok == CHILD_ERROR:
text = '%s (%s)' % (text, test_time)
text = '%s (%s)' % (text, mp_result.error_msg)
running = get_running(workers)
if running and not regrtest.ns.pgo:
text += ' -- running: %s' % ', '.join(running)
regrtest.display_progress(test_index, text)
# Copy stdout and stderr from the child process
if stdout:
print(stdout, flush=True)
if stderr and not regrtest.ns.pgo:
print(stderr, file=sys.stderr, flush=True)
if mp_result.stdout:
print(mp_result.stdout, flush=True)
if mp_result.stderr and not regrtest.ns.pgo:
print(mp_result.stderr, file=sys.stderr, flush=True)
if result[0] == INTERRUPTED:
if result.result == INTERRUPTED:
raise KeyboardInterrupt
test_index += 1
except KeyboardInterrupt:
......@@ -229,7 +230,7 @@ def run_tests_multiprocess(regrtest):
# If tests are interrupted, wait until tests complete
wait_start = time.monotonic()
while True:
running = [worker.current_test for worker in workers]
running = [worker.current_test_name for worker in workers]
running = list(filter(bool, running))
if not running:
break
......
......@@ -9,6 +9,7 @@ import sysconfig
import threading
import warnings
from test import support
from test.libregrtest.utils import print_warning
try:
import _multiprocessing, multiprocessing.process
except ImportError:
......@@ -283,8 +284,7 @@ class saved_test_environment:
self.changed = True
restore(original)
if not self.quiet and not self.pgo:
print(f"Warning -- {name} was modified by {self.testname}",
file=sys.stderr, flush=True)
print_warning(f"{name} was modified by {self.testname}")
print(f" Before: {original}\n After: {current} ",
file=sys.stderr, flush=True)
return False
import os.path
import math
import os.path
import sys
import textwrap
......@@ -54,3 +55,7 @@ def printlist(x, width=70, indent=4, file=None):
print(textwrap.fill(' '.join(str(elt) for elt in sorted(x)), width,
initial_indent=blanks, subsequent_indent=blanks),
file=file)
def print_warning(msg):
print(f"Warning -- {msg}", file=sys.stderr, flush=True)
import subprocess
import sys
import os
import _winapi
import msvcrt
import os
import subprocess
import uuid
from test import support
......
......@@ -26,8 +26,9 @@ ROOT_DIR = os.path.join(os.path.dirname(__file__), '..', '..')
ROOT_DIR = os.path.abspath(os.path.normpath(ROOT_DIR))
TEST_INTERRUPTED = textwrap.dedent("""
from signal import SIGINT, raise_signal
from signal import SIGINT
try:
from signal import raise_signal
raise_signal(SIGINT)
except ImportError:
import os
......@@ -108,7 +109,7 @@ class ParseArgsTestCase(unittest.TestCase):
self.assertTrue(ns.quiet)
self.assertEqual(ns.verbose, 0)
def test_slow(self):
def test_slowest(self):
for opt in '-o', '--slowest':
with self.subTest(opt=opt):
ns = libregrtest._parse_args([opt])
......@@ -780,12 +781,13 @@ class ArgsTestCase(BaseTestCase):
% (self.TESTNAME_REGEX, len(tests)))
self.check_line(output, regex)
def test_slow_interrupted(self):
def test_slowest_interrupted(self):
# Issue #25373: test --slowest with an interrupted test
code = TEST_INTERRUPTED
test = self.create_test("sigint", code=code)
for multiprocessing in (False, True):
with self.subTest(multiprocessing=multiprocessing):
if multiprocessing:
args = ("--slowest", "-j2", test)
else:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment