Commit 5d7e3b6c authored by Yury Selivanov's avatar Yury Selivanov

asyncio: Cleanup Future API

See https://github.com/python/asyncio/pull/292 for details.
parent 0013cced
...@@ -154,7 +154,7 @@ class Future: ...@@ -154,7 +154,7 @@ class Future:
if self._loop.get_debug(): if self._loop.get_debug():
self._source_traceback = traceback.extract_stack(sys._getframe(1)) self._source_traceback = traceback.extract_stack(sys._getframe(1))
def _format_callbacks(self): def __format_callbacks(self):
cb = self._callbacks cb = self._callbacks
size = len(cb) size = len(cb)
if not size: if not size:
...@@ -184,7 +184,7 @@ class Future: ...@@ -184,7 +184,7 @@ class Future:
result = reprlib.repr(self._result) result = reprlib.repr(self._result)
info.append('result={}'.format(result)) info.append('result={}'.format(result))
if self._callbacks: if self._callbacks:
info.append(self._format_callbacks()) info.append(self.__format_callbacks())
if self._source_traceback: if self._source_traceback:
frame = self._source_traceback[-1] frame = self._source_traceback[-1]
info.append('created at %s:%s' % (frame[0], frame[1])) info.append('created at %s:%s' % (frame[0], frame[1]))
...@@ -319,12 +319,6 @@ class Future: ...@@ -319,12 +319,6 @@ class Future:
# So-called internal methods (note: no set_running_or_notify_cancel()). # So-called internal methods (note: no set_running_or_notify_cancel()).
def _set_result_unless_cancelled(self, result):
"""Helper setting the result only if the future was not cancelled."""
if self.cancelled():
return
self.set_result(result)
def set_result(self, result): def set_result(self, result):
"""Mark the future done and set its result. """Mark the future done and set its result.
...@@ -358,27 +352,6 @@ class Future: ...@@ -358,27 +352,6 @@ class Future:
# have had a chance to call result() or exception(). # have had a chance to call result() or exception().
self._loop.call_soon(self._tb_logger.activate) self._loop.call_soon(self._tb_logger.activate)
# Truly internal methods.
def _copy_state(self, other):
"""Internal helper to copy state from another Future.
The other Future may be a concurrent.futures.Future.
"""
assert other.done()
if self.cancelled():
return
assert not self.done()
if other.cancelled():
self.cancel()
else:
exception = other.exception()
if exception is not None:
self.set_exception(exception)
else:
result = other.result()
self.set_result(result)
def __iter__(self): def __iter__(self):
if not self.done(): if not self.done():
self._blocking = True self._blocking = True
...@@ -390,6 +363,13 @@ class Future: ...@@ -390,6 +363,13 @@ class Future:
__await__ = __iter__ # make compatible with 'await' expression __await__ = __iter__ # make compatible with 'await' expression
def _set_result_unless_cancelled(fut, result):
"""Helper setting the result only if the future was not cancelled."""
if fut.cancelled():
return
fut.set_result(result)
def _set_concurrent_future_state(concurrent, source): def _set_concurrent_future_state(concurrent, source):
"""Copy state from a future to a concurrent.futures.Future.""" """Copy state from a future to a concurrent.futures.Future."""
assert source.done() assert source.done()
...@@ -405,6 +385,26 @@ def _set_concurrent_future_state(concurrent, source): ...@@ -405,6 +385,26 @@ def _set_concurrent_future_state(concurrent, source):
concurrent.set_result(result) concurrent.set_result(result)
def _copy_future_state(source, dest):
"""Internal helper to copy state from another Future.
The other Future may be a concurrent.futures.Future.
"""
assert source.done()
if dest.cancelled():
return
assert not dest.done()
if source.cancelled():
dest.cancel()
else:
exception = source.exception()
if exception is not None:
dest.set_exception(exception)
else:
result = source.result()
dest.set_result(result)
def _chain_future(source, destination): def _chain_future(source, destination):
"""Chain two futures so that when one completes, so does the other. """Chain two futures so that when one completes, so does the other.
...@@ -421,7 +421,7 @@ def _chain_future(source, destination): ...@@ -421,7 +421,7 @@ def _chain_future(source, destination):
def _set_state(future, other): def _set_state(future, other):
if isinstance(future, Future): if isinstance(future, Future):
future._copy_state(other) _copy_future_state(other, future)
else: else:
_set_concurrent_future_state(future, other) _set_concurrent_future_state(future, other)
......
...@@ -41,7 +41,8 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin, ...@@ -41,7 +41,8 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin,
self._loop.call_soon(self._protocol.connection_made, self) self._loop.call_soon(self._protocol.connection_made, self)
if waiter is not None: if waiter is not None:
# only wake up the waiter when connection_made() has been called # only wake up the waiter when connection_made() has been called
self._loop.call_soon(waiter._set_result_unless_cancelled, None) self._loop.call_soon(futures._set_result_unless_cancelled,
waiter, None)
def __repr__(self): def __repr__(self):
info = [self.__class__.__name__] info = [self.__class__.__name__]
......
...@@ -636,7 +636,8 @@ class _SelectorSocketTransport(_SelectorTransport): ...@@ -636,7 +636,8 @@ class _SelectorSocketTransport(_SelectorTransport):
self._sock_fd, self._read_ready) self._sock_fd, self._read_ready)
if waiter is not None: if waiter is not None:
# only wake up the waiter when connection_made() has been called # only wake up the waiter when connection_made() has been called
self._loop.call_soon(waiter._set_result_unless_cancelled, None) self._loop.call_soon(futures._set_result_unless_cancelled,
waiter, None)
def pause_reading(self): def pause_reading(self):
if self._closing: if self._closing:
...@@ -990,7 +991,8 @@ class _SelectorDatagramTransport(_SelectorTransport): ...@@ -990,7 +991,8 @@ class _SelectorDatagramTransport(_SelectorTransport):
self._sock_fd, self._read_ready) self._sock_fd, self._read_ready)
if waiter is not None: if waiter is not None:
# only wake up the waiter when connection_made() has been called # only wake up the waiter when connection_made() has been called
self._loop.call_soon(waiter._set_result_unless_cancelled, None) self._loop.call_soon(futures._set_result_unless_cancelled,
waiter, None)
def get_write_buffer_size(self): def get_write_buffer_size(self):
return sum(len(data) for data, _ in self._buffer) return sum(len(data) for data, _ in self._buffer)
......
...@@ -500,7 +500,8 @@ def sleep(delay, result=None, *, loop=None): ...@@ -500,7 +500,8 @@ def sleep(delay, result=None, *, loop=None):
future = futures.Future(loop=loop) future = futures.Future(loop=loop)
h = future._loop.call_later(delay, h = future._loop.call_later(delay,
future._set_result_unless_cancelled, result) futures._set_result_unless_cancelled,
future, result)
try: try:
return (yield from future) return (yield from future)
finally: finally:
......
...@@ -319,7 +319,8 @@ class _UnixReadPipeTransport(transports.ReadTransport): ...@@ -319,7 +319,8 @@ class _UnixReadPipeTransport(transports.ReadTransport):
self._fileno, self._read_ready) self._fileno, self._read_ready)
if waiter is not None: if waiter is not None:
# only wake up the waiter when connection_made() has been called # only wake up the waiter when connection_made() has been called
self._loop.call_soon(waiter._set_result_unless_cancelled, None) self._loop.call_soon(futures._set_result_unless_cancelled,
waiter, None)
def __repr__(self): def __repr__(self):
info = [self.__class__.__name__] info = [self.__class__.__name__]
...@@ -442,7 +443,8 @@ class _UnixWritePipeTransport(transports._FlowControlMixin, ...@@ -442,7 +443,8 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
if waiter is not None: if waiter is not None:
# only wake up the waiter when connection_made() has been called # only wake up the waiter when connection_made() has been called
self._loop.call_soon(waiter._set_result_unless_cancelled, None) self._loop.call_soon(futures._set_result_unless_cancelled,
waiter, None)
def __repr__(self): def __repr__(self):
info = [self.__class__.__name__] info = [self.__class__.__name__]
......
...@@ -174,11 +174,13 @@ class FutureTests(test_utils.TestCase): ...@@ -174,11 +174,13 @@ class FutureTests(test_utils.TestCase):
'<Future cancelled>') '<Future cancelled>')
def test_copy_state(self): def test_copy_state(self):
from asyncio.futures import _copy_future_state
f = asyncio.Future(loop=self.loop) f = asyncio.Future(loop=self.loop)
f.set_result(10) f.set_result(10)
newf = asyncio.Future(loop=self.loop) newf = asyncio.Future(loop=self.loop)
newf._copy_state(f) _copy_future_state(f, newf)
self.assertTrue(newf.done()) self.assertTrue(newf.done())
self.assertEqual(newf.result(), 10) self.assertEqual(newf.result(), 10)
...@@ -186,7 +188,7 @@ class FutureTests(test_utils.TestCase): ...@@ -186,7 +188,7 @@ class FutureTests(test_utils.TestCase):
f_exception.set_exception(RuntimeError()) f_exception.set_exception(RuntimeError())
newf_exception = asyncio.Future(loop=self.loop) newf_exception = asyncio.Future(loop=self.loop)
newf_exception._copy_state(f_exception) _copy_future_state(f_exception, newf_exception)
self.assertTrue(newf_exception.done()) self.assertTrue(newf_exception.done())
self.assertRaises(RuntimeError, newf_exception.result) self.assertRaises(RuntimeError, newf_exception.result)
...@@ -194,7 +196,7 @@ class FutureTests(test_utils.TestCase): ...@@ -194,7 +196,7 @@ class FutureTests(test_utils.TestCase):
f_cancelled.cancel() f_cancelled.cancel()
newf_cancelled = asyncio.Future(loop=self.loop) newf_cancelled = asyncio.Future(loop=self.loop)
newf_cancelled._copy_state(f_cancelled) _copy_future_state(f_cancelled, newf_cancelled)
self.assertTrue(newf_cancelled.cancelled()) self.assertTrue(newf_cancelled.cancelled())
def test_iter(self): def test_iter(self):
...@@ -382,9 +384,10 @@ class FutureTests(test_utils.TestCase): ...@@ -382,9 +384,10 @@ class FutureTests(test_utils.TestCase):
self.check_future_exception_never_retrieved(True) self.check_future_exception_never_retrieved(True)
def test_set_result_unless_cancelled(self): def test_set_result_unless_cancelled(self):
from asyncio import futures
fut = asyncio.Future(loop=self.loop) fut = asyncio.Future(loop=self.loop)
fut.cancel() fut.cancel()
fut._set_result_unless_cancelled(2) futures._set_result_unless_cancelled(fut, 2)
self.assertTrue(fut.cancelled()) self.assertTrue(fut.cancelled())
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment