Commit e4d300e0 authored by Victor Stinner's avatar Victor Stinner Committed by GitHub

bpo-36829: Add test.support.catch_unraisable_exception() (GH-13490)

* Copy test_exceptions.test_unraisable() to
  test_sys.UnraisableHookTest().
* Use catch_unraisable_exception() in test_coroutines,
  test_exceptions, test_generators.
parent 904e34d4
...@@ -3034,3 +3034,36 @@ def collision_stats(nbins, nballs): ...@@ -3034,3 +3034,36 @@ def collision_stats(nbins, nballs):
collisions = k - occupied collisions = k - occupied
var = dn*(dn-1)*((dn-2)/dn)**k + meanempty * (1 - meanempty) var = dn*(dn-1)*((dn-2)/dn)**k + meanempty * (1 - meanempty)
return float(collisions), float(var.sqrt()) return float(collisions), float(var.sqrt())
class catch_unraisable_exception:
"""
Context manager catching unraisable exception using sys.unraisablehook.
Usage:
with support.catch_unraisable_exception() as cm:
...
# check the expected unraisable exception: use cm.unraisable
...
# cm.unraisable is None here (to break a reference cycle)
"""
def __init__(self):
self.unraisable = None
self._old_hook = None
def _hook(self, unraisable):
self.unraisable = unraisable
def __enter__(self):
self._old_hook = sys.unraisablehook
sys.unraisablehook = self._hook
return self
def __exit__(self, *exc_info):
# Clear the unraisable exception to explicitly break a reference cycle
self.unraisable = None
sys.unraisablehook = self._old_hook
...@@ -2342,12 +2342,19 @@ class OriginTrackingTest(unittest.TestCase): ...@@ -2342,12 +2342,19 @@ class OriginTrackingTest(unittest.TestCase):
orig_wuc = warnings._warn_unawaited_coroutine orig_wuc = warnings._warn_unawaited_coroutine
try: try:
warnings._warn_unawaited_coroutine = lambda coro: 1/0 warnings._warn_unawaited_coroutine = lambda coro: 1/0
with support.captured_stderr() as stream: with support.catch_unraisable_exception() as cm, \
corofn() support.captured_stderr() as stream:
# only store repr() to avoid keeping the coroutine alive
coro = corofn()
coro_repr = repr(coro)
# clear reference to the coroutine without awaiting for it
del coro
support.gc_collect() support.gc_collect()
self.assertIn("Exception ignored in", stream.getvalue())
self.assertIn("ZeroDivisionError", stream.getvalue()) self.assertEqual(repr(cm.unraisable.object), coro_repr)
self.assertIn("was never awaited", stream.getvalue()) self.assertEqual(cm.unraisable.exc_type, ZeroDivisionError)
self.assertIn("was never awaited", stream.getvalue())
del warnings._warn_unawaited_coroutine del warnings._warn_unawaited_coroutine
with support.captured_stderr() as stream: with support.captured_stderr() as stream:
......
...@@ -12,6 +12,9 @@ from test.support import (TESTFN, captured_stderr, check_impl_detail, ...@@ -12,6 +12,9 @@ from test.support import (TESTFN, captured_stderr, check_impl_detail,
check_warnings, cpython_only, gc_collect, run_unittest, check_warnings, cpython_only, gc_collect, run_unittest,
no_tracing, unlink, import_module, script_helper, no_tracing, unlink, import_module, script_helper,
SuppressCrashReport) SuppressCrashReport)
from test import support
class NaiveException(Exception): class NaiveException(Exception):
def __init__(self, x): def __init__(self, x):
self.x = x self.x = x
...@@ -1181,29 +1184,12 @@ class ExceptionTests(unittest.TestCase): ...@@ -1181,29 +1184,12 @@ class ExceptionTests(unittest.TestCase):
# The following line is included in the traceback report: # The following line is included in the traceback report:
raise exc raise exc
class BrokenExceptionDel: obj = BrokenDel()
def __del__(self): with support.catch_unraisable_exception() as cm:
exc = BrokenStrException() del obj
# The following line is included in the traceback report:
raise exc
for test_class in (BrokenDel, BrokenExceptionDel): self.assertEqual(cm.unraisable.object, BrokenDel.__del__)
with self.subTest(test_class): self.assertIsNotNone(cm.unraisable.exc_traceback)
obj = test_class()
with captured_stderr() as stderr:
del obj
report = stderr.getvalue()
self.assertIn("Exception ignored", report)
self.assertIn(test_class.__del__.__qualname__, report)
self.assertIn("test_exceptions.py", report)
self.assertIn("raise exc", report)
if test_class is BrokenExceptionDel:
self.assertIn("BrokenStrException", report)
self.assertIn("<exception str() failed>", report)
else:
self.assertIn("ValueError", report)
self.assertIn("del is broken", report)
self.assertTrue(report.endswith("\n"))
def test_unhandled(self): def test_unhandled(self):
# Check for sensible reporting of unhandled exceptions # Check for sensible reporting of unhandled exceptions
......
...@@ -2156,25 +2156,21 @@ explicitly, without generators. We do have to redirect stderr to avoid ...@@ -2156,25 +2156,21 @@ explicitly, without generators. We do have to redirect stderr to avoid
printing warnings and to doublecheck that we actually tested what we wanted printing warnings and to doublecheck that we actually tested what we wanted
to test. to test.
>>> import sys, io >>> from test import support
>>> old = sys.stderr >>> class Leaker:
>>> try: ... def __del__(self):
... sys.stderr = io.StringIO() ... def invoke(message):
... class Leaker: ... raise RuntimeError(message)
... def __del__(self): ... invoke("del failed")
... def invoke(message):
... raise RuntimeError(message)
... invoke("test")
... ...
>>> with support.catch_unraisable_exception() as cm:
... l = Leaker() ... l = Leaker()
... del l ... del l
... err = sys.stderr.getvalue().strip() ...
... "Exception ignored in" in err ... cm.unraisable.object == Leaker.__del__
... "RuntimeError: test" in err ... cm.unraisable.exc_type == RuntimeError
... "Traceback" in err ... str(cm.unraisable.exc_value) == "del failed"
... "in invoke" in err ... cm.unraisable.exc_traceback is not None
... finally:
... sys.stderr = old
True True
True True
True True
......
...@@ -909,6 +909,47 @@ class UnraisableHookTest(unittest.TestCase): ...@@ -909,6 +909,47 @@ class UnraisableHookTest(unittest.TestCase):
self.assertIn('Traceback (most recent call last):\n', err) self.assertIn('Traceback (most recent call last):\n', err)
self.assertIn('ValueError: 42\n', err) self.assertIn('ValueError: 42\n', err)
def test_original_unraisablehook_err(self):
# bpo-22836: PyErr_WriteUnraisable() should give sensible reports
class BrokenDel:
def __del__(self):
exc = ValueError("del is broken")
# The following line is included in the traceback report:
raise exc
class BrokenStrException(Exception):
def __str__(self):
raise Exception("str() is broken")
class BrokenExceptionDel:
def __del__(self):
exc = BrokenStrException()
# The following line is included in the traceback report:
raise exc
for test_class in (BrokenDel, BrokenExceptionDel):
with self.subTest(test_class):
obj = test_class()
with test.support.captured_stderr() as stderr, \
test.support.swap_attr(sys, 'unraisablehook',
sys.__unraisablehook__):
# Trigger obj.__del__()
del obj
report = stderr.getvalue()
self.assertIn("Exception ignored", report)
self.assertIn(test_class.__del__.__qualname__, report)
self.assertIn("test_sys.py", report)
self.assertIn("raise exc", report)
if test_class is BrokenExceptionDel:
self.assertIn("BrokenStrException", report)
self.assertIn("<exception str() failed>", report)
else:
self.assertIn("ValueError", report)
self.assertIn("del is broken", report)
self.assertTrue(report.endswith("\n"))
def test_original_unraisablehook_wrong_type(self): def test_original_unraisablehook_wrong_type(self):
exc = ValueError(42) exc = ValueError(42)
with test.support.swap_attr(sys, 'unraisablehook', with test.support.swap_attr(sys, 'unraisablehook',
......
Add :func:`test.support.catch_unraisable_exception`: context manager
catching unraisable exception using :func:`sys.unraisablehook`.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment