Commit 3267a30d authored by Nick Coghlan's avatar Nick Coghlan

Close #13585: add contextlib.ExitStack to replace the ill-fated contextlib.nested API

parent 6e49ac24
This diff is collapsed.
......@@ -696,6 +696,21 @@ collections classes. Aliases for ABCs are still present in the
.. XXX addition of __slots__ to ABCs not recorded here: internal detail
contextlib
----------
:class:`~collections.ExitStack` now provides a solid foundation for
programmatic manipulation of context managers and similar cleanup
functionality. Unlike the previous ``contextlib.nested`` API (which was
deprecated and removed), the new API is designed to work correctly
regardless of whether context managers acquire their resources in
their ``__init`` method (for example, file objects) or in their
``__enter__`` method (for example, synchronisation objects from the
:mod:`threading` module).
(:issue:`13585`)
crypt
-----
......
"""Utilities for with-statement contexts. See PEP 343."""
import sys
from collections import deque
from functools import wraps
__all__ = ["contextmanager", "closing", "ContextDecorator"]
__all__ = ["contextmanager", "closing", "ContextDecorator", "ExitStack"]
class ContextDecorator(object):
......@@ -12,12 +13,12 @@ class ContextDecorator(object):
def _recreate_cm(self):
"""Return a recreated instance of self.
Allows otherwise one-shot context managers like
Allows an otherwise one-shot context manager like
_GeneratorContextManager to support use as
decorators via implicit recreation.
a decorator via implicit recreation.
Note: this is a private interface just for _GCM in 3.2 but will be
renamed and documented for third party use in 3.3
This is a private interface just for _GeneratorContextManager.
See issue #11647 for details.
"""
return self
......@@ -138,3 +139,118 @@ class closing(object):
return self.thing
def __exit__(self, *exc_info):
self.thing.close()
# Inspired by discussions on http://bugs.python.org/issue13585
class ExitStack(object):
"""Context manager for dynamic management of a stack of exit callbacks
For example:
with ExitStack() as stack:
files = [stack.enter_context(open(fname)) for fname in filenames]
# All opened files will automatically be closed at the end of
# the with statement, even if attempts to open files later
# in the list throw an exception
"""
def __init__(self):
self._exit_callbacks = deque()
def pop_all(self):
"""Preserve the context stack by transferring it to a new instance"""
new_stack = type(self)()
new_stack._exit_callbacks = self._exit_callbacks
self._exit_callbacks = deque()
return new_stack
def _push_cm_exit(self, cm, cm_exit):
"""Helper to correctly register callbacks to __exit__ methods"""
def _exit_wrapper(*exc_details):
return cm_exit(cm, *exc_details)
_exit_wrapper.__self__ = cm
self.push(_exit_wrapper)
def push(self, exit):
"""Registers a callback with the standard __exit__ method signature
Can suppress exceptions the same way __exit__ methods can.
Also accepts any object with an __exit__ method (registering a call
to the method instead of the object itself)
"""
# We use an unbound method rather than a bound method to follow
# the standard lookup behaviour for special methods
_cb_type = type(exit)
try:
exit_method = _cb_type.__exit__
except AttributeError:
# Not a context manager, so assume its a callable
self._exit_callbacks.append(exit)
else:
self._push_cm_exit(exit, exit_method)
return exit # Allow use as a decorator
def callback(self, callback, *args, **kwds):
"""Registers an arbitrary callback and arguments.
Cannot suppress exceptions.
"""
def _exit_wrapper(exc_type, exc, tb):
callback(*args, **kwds)
# We changed the signature, so using @wraps is not appropriate, but
# setting __wrapped__ may still help with introspection
_exit_wrapper.__wrapped__ = callback
self.push(_exit_wrapper)
return callback # Allow use as a decorator
def enter_context(self, cm):
"""Enters the supplied context manager
If successful, also pushes its __exit__ method as a callback and
returns the result of the __enter__ method.
"""
# We look up the special methods on the type to match the with statement
_cm_type = type(cm)
_exit = _cm_type.__exit__
result = _cm_type.__enter__(cm)
self._push_cm_exit(cm, _exit)
return result
def close(self):
"""Immediately unwind the context stack"""
self.__exit__(None, None, None)
def __enter__(self):
return self
def __exit__(self, *exc_details):
if not self._exit_callbacks:
return
# This looks complicated, but it is really just
# setting up a chain of try-expect statements to ensure
# that outer callbacks still get invoked even if an
# inner one throws an exception
def _invoke_next_callback(exc_details):
# Callbacks are removed from the list in FIFO order
# but the recursion means they're invoked in LIFO order
cb = self._exit_callbacks.popleft()
if not self._exit_callbacks:
# Innermost callback is invoked directly
return cb(*exc_details)
# More callbacks left, so descend another level in the stack
try:
suppress_exc = _invoke_next_callback(exc_details)
except:
suppress_exc = cb(*sys.exc_info())
# Check if this cb suppressed the inner exception
if not suppress_exc:
raise
else:
# Check if inner cb suppressed the original exception
if suppress_exc:
exc_details = (None, None, None)
suppress_exc = cb(*exc_details) or suppress_exc
return suppress_exc
# Kick off the recursive chain
return _invoke_next_callback(exc_details)
......@@ -370,6 +370,129 @@ class TestContextDecorator(unittest.TestCase):
self.assertEqual(state, [1, 'something else', 999])
class TestExitStack(unittest.TestCase):
def test_no_resources(self):
with ExitStack():
pass
def test_callback(self):
expected = [
((), {}),
((1,), {}),
((1,2), {}),
((), dict(example=1)),
((1,), dict(example=1)),
((1,2), dict(example=1)),
]
result = []
def _exit(*args, **kwds):
"""Test metadata propagation"""
result.append((args, kwds))
with ExitStack() as stack:
for args, kwds in reversed(expected):
if args and kwds:
f = stack.callback(_exit, *args, **kwds)
elif args:
f = stack.callback(_exit, *args)
elif kwds:
f = stack.callback(_exit, **kwds)
else:
f = stack.callback(_exit)
self.assertIs(f, _exit)
for wrapper in stack._exit_callbacks:
self.assertIs(wrapper.__wrapped__, _exit)
self.assertNotEqual(wrapper.__name__, _exit.__name__)
self.assertIsNone(wrapper.__doc__, _exit.__doc__)
self.assertEqual(result, expected)
def test_push(self):
exc_raised = ZeroDivisionError
def _expect_exc(exc_type, exc, exc_tb):
self.assertIs(exc_type, exc_raised)
def _suppress_exc(*exc_details):
return True
def _expect_ok(exc_type, exc, exc_tb):
self.assertIsNone(exc_type)
self.assertIsNone(exc)
self.assertIsNone(exc_tb)
class ExitCM(object):
def __init__(self, check_exc):
self.check_exc = check_exc
def __enter__(self):
self.fail("Should not be called!")
def __exit__(self, *exc_details):
self.check_exc(*exc_details)
with ExitStack() as stack:
stack.push(_expect_ok)
self.assertIs(stack._exit_callbacks[-1], _expect_ok)
cm = ExitCM(_expect_ok)
stack.push(cm)
self.assertIs(stack._exit_callbacks[-1].__self__, cm)
stack.push(_suppress_exc)
self.assertIs(stack._exit_callbacks[-1], _suppress_exc)
cm = ExitCM(_expect_exc)
stack.push(cm)
self.assertIs(stack._exit_callbacks[-1].__self__, cm)
stack.push(_expect_exc)
self.assertIs(stack._exit_callbacks[-1], _expect_exc)
stack.push(_expect_exc)
self.assertIs(stack._exit_callbacks[-1], _expect_exc)
1/0
def test_enter_context(self):
class TestCM(object):
def __enter__(self):
result.append(1)
def __exit__(self, *exc_details):
result.append(3)
result = []
cm = TestCM()
with ExitStack() as stack:
@stack.callback # Registered first => cleaned up last
def _exit():
result.append(4)
self.assertIsNotNone(_exit)
stack.enter_context(cm)
self.assertIs(stack._exit_callbacks[-1].__self__, cm)
result.append(2)
self.assertEqual(result, [1, 2, 3, 4])
def test_close(self):
result = []
with ExitStack() as stack:
@stack.callback
def _exit():
result.append(1)
self.assertIsNotNone(_exit)
stack.close()
result.append(2)
self.assertEqual(result, [1, 2])
def test_pop_all(self):
result = []
with ExitStack() as stack:
@stack.callback
def _exit():
result.append(3)
self.assertIsNotNone(_exit)
new_stack = stack.pop_all()
result.append(1)
result.append(2)
new_stack.close()
self.assertEqual(result, [1, 2, 3])
def test_instance_bypass(self):
class Example(object): pass
cm = Example()
cm.__exit__ = object()
stack = ExitStack()
self.assertRaises(AttributeError, stack.enter_context, cm)
stack.push(cm)
self.assertIs(stack._exit_callbacks[-1], cm)
# This is needed to make the test actually run under regrtest.py!
def test_main():
support.run_unittest(__name__)
......
......@@ -42,6 +42,8 @@ Core and Builtins
Library
-------
- Issue #13585: Added contextlib.ExitStack
- PEP 3144, Issue #14814: Added the ipaddress module
- Issue #14426: Correct the Date format in Expires attribute of Set-Cookie
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment