Commit f1024f74 authored by Raymond Hettinger's avatar Raymond Hettinger

merge

parents 3ceb573a 87dc4d61
...@@ -23,6 +23,8 @@ the first retrieved (operating like a stack). With a priority queue, ...@@ -23,6 +23,8 @@ the first retrieved (operating like a stack). With a priority queue,
the entries are kept sorted (using the :mod:`heapq` module) and the the entries are kept sorted (using the :mod:`heapq` module) and the
lowest valued entry is retrieved first. lowest valued entry is retrieved first.
Internally, the module uses locks to temporarily block competing threads;
however, it is not designed to handle reentrancy within a thread.
The :mod:`queue` module defines the following classes and exceptions: The :mod:`queue` module defines the following classes and exceptions:
...@@ -189,11 +191,6 @@ Example of how to wait for enqueued tasks to be completed:: ...@@ -189,11 +191,6 @@ Example of how to wait for enqueued tasks to be completed::
t.join() t.join()
.. note::
The :mod:`queue` module is not safe for use from :mod:`signal` handlers as
it uses :mod:`threading` locks.
.. seealso:: .. seealso::
Class :class:`multiprocessing.Queue` Class :class:`multiprocessing.Queue`
......
...@@ -414,7 +414,7 @@ class BaseEventLoop(events.AbstractEventLoop): ...@@ -414,7 +414,7 @@ class BaseEventLoop(events.AbstractEventLoop):
""" """
self._check_closed() self._check_closed()
new_task = not isinstance(future, futures.Future) new_task = not futures.isfuture(future)
future = tasks.ensure_future(future, loop=self) future = tasks.ensure_future(future, loop=self)
if new_task: if new_task:
# An exception is raised if the future didn't complete, so there # An exception is raised if the future didn't complete, so there
......
...@@ -204,8 +204,8 @@ def coroutine(func): ...@@ -204,8 +204,8 @@ def coroutine(func):
@functools.wraps(func) @functools.wraps(func)
def coro(*args, **kw): def coro(*args, **kw):
res = func(*args, **kw) res = func(*args, **kw)
if isinstance(res, futures.Future) or inspect.isgenerator(res) or \ if (futures.isfuture(res) or inspect.isgenerator(res) or
isinstance(res, CoroWrapper): isinstance(res, CoroWrapper)):
res = yield from res res = yield from res
elif _AwaitableABC is not None: elif _AwaitableABC is not None:
# If 'func' returns an Awaitable (new in 3.5) we # If 'func' returns an Awaitable (new in 3.5) we
......
...@@ -110,6 +110,16 @@ class _TracebackLogger: ...@@ -110,6 +110,16 @@ class _TracebackLogger:
self.loop.call_exception_handler({'message': msg}) self.loop.call_exception_handler({'message': msg})
def isfuture(obj):
"""Check for a Future.
This returns True when obj is a Future instance or is advertising
itself as duck-type compatible by setting _asyncio_future_blocking.
See comment in Future for more details.
"""
return getattr(obj, '_asyncio_future_blocking', None) is not None
class Future: class Future:
"""This class is *almost* compatible with concurrent.futures.Future. """This class is *almost* compatible with concurrent.futures.Future.
...@@ -423,15 +433,17 @@ def _chain_future(source, destination): ...@@ -423,15 +433,17 @@ def _chain_future(source, destination):
If destination is cancelled, source gets cancelled too. If destination is cancelled, source gets cancelled too.
Compatible with both asyncio.Future and concurrent.futures.Future. Compatible with both asyncio.Future and concurrent.futures.Future.
""" """
if not isinstance(source, (Future, concurrent.futures.Future)): if not isfuture(source) and not isinstance(source,
concurrent.futures.Future):
raise TypeError('A future is required for source argument') raise TypeError('A future is required for source argument')
if not isinstance(destination, (Future, concurrent.futures.Future)): if not isfuture(destination) and not isinstance(destination,
concurrent.futures.Future):
raise TypeError('A future is required for destination argument') raise TypeError('A future is required for destination argument')
source_loop = source._loop if isinstance(source, Future) else None source_loop = source._loop if isfuture(source) else None
dest_loop = destination._loop if isinstance(destination, Future) else None dest_loop = destination._loop if isfuture(destination) else None
def _set_state(future, other): def _set_state(future, other):
if isinstance(future, Future): if isfuture(future):
_copy_future_state(other, future) _copy_future_state(other, future)
else: else:
_set_concurrent_future_state(future, other) _set_concurrent_future_state(future, other)
...@@ -455,7 +467,7 @@ def _chain_future(source, destination): ...@@ -455,7 +467,7 @@ def _chain_future(source, destination):
def wrap_future(future, *, loop=None): def wrap_future(future, *, loop=None):
"""Wrap concurrent.futures.Future object.""" """Wrap concurrent.futures.Future object."""
if isinstance(future, Future): if isfuture(future):
return future return future
assert isinstance(future, concurrent.futures.Future), \ assert isinstance(future, concurrent.futures.Future), \
'concurrent.futures.Future is expected, got {!r}'.format(future) 'concurrent.futures.Future is expected, got {!r}'.format(future)
......
...@@ -333,7 +333,7 @@ def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED): ...@@ -333,7 +333,7 @@ def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
Note: This does not raise TimeoutError! Futures that aren't done Note: This does not raise TimeoutError! Futures that aren't done
when the timeout occurs are returned in the second set. when the timeout occurs are returned in the second set.
""" """
if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs): if futures.isfuture(fs) or coroutines.iscoroutine(fs):
raise TypeError("expect a list of futures, not %s" % type(fs).__name__) raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
if not fs: if not fs:
raise ValueError('Set of coroutines/Futures is empty.') raise ValueError('Set of coroutines/Futures is empty.')
...@@ -462,7 +462,7 @@ def as_completed(fs, *, loop=None, timeout=None): ...@@ -462,7 +462,7 @@ def as_completed(fs, *, loop=None, timeout=None):
Note: The futures 'f' are not necessarily members of fs. Note: The futures 'f' are not necessarily members of fs.
""" """
if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs): if futures.isfuture(fs) or coroutines.iscoroutine(fs):
raise TypeError("expect a list of futures, not %s" % type(fs).__name__) raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
loop = loop if loop is not None else events.get_event_loop() loop = loop if loop is not None else events.get_event_loop()
todo = {ensure_future(f, loop=loop) for f in set(fs)} todo = {ensure_future(f, loop=loop) for f in set(fs)}
...@@ -538,7 +538,7 @@ def ensure_future(coro_or_future, *, loop=None): ...@@ -538,7 +538,7 @@ def ensure_future(coro_or_future, *, loop=None):
If the argument is a Future, it is returned directly. If the argument is a Future, it is returned directly.
""" """
if isinstance(coro_or_future, futures.Future): if futures.isfuture(coro_or_future):
if loop is not None and loop is not coro_or_future._loop: if loop is not None and loop is not coro_or_future._loop:
raise ValueError('loop argument must agree with Future') raise ValueError('loop argument must agree with Future')
return coro_or_future return coro_or_future
...@@ -614,7 +614,7 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False): ...@@ -614,7 +614,7 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False):
arg_to_fut = {} arg_to_fut = {}
for arg in set(coros_or_futures): for arg in set(coros_or_futures):
if not isinstance(arg, futures.Future): if not futures.isfuture(arg):
fut = ensure_future(arg, loop=loop) fut = ensure_future(arg, loop=loop)
if loop is None: if loop is None:
loop = fut._loop loop = fut._loop
......
...@@ -793,7 +793,7 @@ class EventLoopTestsMixin: ...@@ -793,7 +793,7 @@ class EventLoopTestsMixin:
loop.connect_accepted_socket( loop.connect_accepted_socket(
(lambda : proto), conn, ssl=server_ssl)) (lambda : proto), conn, ssl=server_ssl))
loop.run_forever() loop.run_forever()
conn.close() proto.transport.close()
lsock.close() lsock.close()
thread.join(1) thread.join(1)
......
...@@ -25,6 +25,74 @@ def last_cb(): ...@@ -25,6 +25,74 @@ def last_cb():
pass pass
class DuckFuture:
# Class that does not inherit from Future but aims to be duck-type
# compatible with it.
_asyncio_future_blocking = False
__cancelled = False
__result = None
__exception = None
def cancel(self):
if self.done():
return False
self.__cancelled = True
return True
def cancelled(self):
return self.__cancelled
def done(self):
return (self.__cancelled
or self.__result is not None
or self.__exception is not None)
def result(self):
assert not self.cancelled()
if self.__exception is not None:
raise self.__exception
return self.__result
def exception(self):
assert not self.cancelled()
return self.__exception
def set_result(self, result):
assert not self.done()
assert result is not None
self.__result = result
def set_exception(self, exception):
assert not self.done()
assert exception is not None
self.__exception = exception
def __iter__(self):
if not self.done():
self._asyncio_future_blocking = True
yield self
assert self.done()
return self.result()
class DuckTests(test_utils.TestCase):
def setUp(self):
self.loop = self.new_test_loop()
self.addCleanup(self.loop.close)
def test_wrap_future(self):
f = DuckFuture()
g = asyncio.wrap_future(f)
assert g is f
def test_ensure_future(self):
f = DuckFuture()
g = asyncio.ensure_future(f)
assert g is f
class FutureTests(test_utils.TestCase): class FutureTests(test_utils.TestCase):
def setUp(self): def setUp(self):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment