Commit 841d9ee4 authored by Guido van Rossum's avatar Guido van Rossum

Issue #25304: Add asyncio.run_coroutine_threadsafe(). By Vincent Michel.

parent 3795d12a
...@@ -390,22 +390,64 @@ class Future: ...@@ -390,22 +390,64 @@ class Future:
__await__ = __iter__ # make compatible with 'await' expression __await__ = __iter__ # make compatible with 'await' expression
def wrap_future(fut, *, loop=None): def _set_concurrent_future_state(concurrent, source):
"""Wrap concurrent.futures.Future object.""" """Copy state from a future to a concurrent.futures.Future."""
if isinstance(fut, Future): assert source.done()
return fut if source.cancelled():
assert isinstance(fut, concurrent.futures.Future), \ concurrent.cancel()
'concurrent.futures.Future is expected, got {!r}'.format(fut) if not concurrent.set_running_or_notify_cancel():
if loop is None: return
loop = events.get_event_loop() exception = source.exception()
new_future = Future(loop=loop) if exception is not None:
concurrent.set_exception(exception)
else:
result = source.result()
concurrent.set_result(result)
def _chain_future(source, destination):
"""Chain two futures so that when one completes, so does the other.
The result (or exception) of source will be copied to destination.
If destination is cancelled, source gets cancelled too.
Compatible with both asyncio.Future and concurrent.futures.Future.
"""
if not isinstance(source, (Future, concurrent.futures.Future)):
raise TypeError('A future is required for source argument')
if not isinstance(destination, (Future, concurrent.futures.Future)):
raise TypeError('A future is required for destination argument')
source_loop = source._loop if isinstance(source, Future) else None
dest_loop = destination._loop if isinstance(destination, Future) else None
def _set_state(future, other):
if isinstance(future, Future):
future._copy_state(other)
else:
_set_concurrent_future_state(future, other)
def _check_cancel_other(f): def _call_check_cancel(destination):
if f.cancelled(): if destination.cancelled():
fut.cancel() if source_loop is None or source_loop is dest_loop:
source.cancel()
else:
source_loop.call_soon_threadsafe(source.cancel)
new_future.add_done_callback(_check_cancel_other) def _call_set_state(source):
fut.add_done_callback( if dest_loop is None or dest_loop is source_loop:
lambda future: loop.call_soon_threadsafe( _set_state(destination, source)
new_future._copy_state, future)) else:
dest_loop.call_soon_threadsafe(_set_state, destination, source)
destination.add_done_callback(_call_check_cancel)
source.add_done_callback(_call_set_state)
def wrap_future(future, *, loop=None):
"""Wrap concurrent.futures.Future object."""
if isinstance(future, Future):
return future
assert isinstance(future, concurrent.futures.Future), \
'concurrent.futures.Future is expected, got {!r}'.format(future)
new_future = Future(loop=loop)
_chain_future(future, new_future)
return new_future return new_future
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
__all__ = ['Task', __all__ = ['Task',
'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED', 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
'wait', 'wait_for', 'as_completed', 'sleep', 'async', 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
'gather', 'shield', 'ensure_future', 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
] ]
import concurrent.futures import concurrent.futures
...@@ -692,3 +692,19 @@ def shield(arg, *, loop=None): ...@@ -692,3 +692,19 @@ def shield(arg, *, loop=None):
inner.add_done_callback(_done_callback) inner.add_done_callback(_done_callback)
return outer return outer
def run_coroutine_threadsafe(coro, loop):
"""Submit a coroutine object to a given event loop.
Return a concurrent.futures.Future to access the result.
"""
if not coroutines.iscoroutine(coro):
raise TypeError('A coroutine object is required')
future = concurrent.futures.Future()
def callback():
futures._chain_future(ensure_future(coro, loop=loop), future)
loop.call_soon_threadsafe(callback)
return future
...@@ -174,8 +174,6 @@ class FutureTests(test_utils.TestCase): ...@@ -174,8 +174,6 @@ class FutureTests(test_utils.TestCase):
'<Future cancelled>') '<Future cancelled>')
def test_copy_state(self): def test_copy_state(self):
# Test the internal _copy_state method since it's being directly
# invoked in other modules.
f = asyncio.Future(loop=self.loop) f = asyncio.Future(loop=self.loop)
f.set_result(10) f.set_result(10)
......
...@@ -2100,5 +2100,72 @@ class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase): ...@@ -2100,5 +2100,72 @@ class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase):
self.assertIsInstance(f.exception(), RuntimeError) self.assertIsInstance(f.exception(), RuntimeError)
class RunCoroutineThreadsafeTests(test_utils.TestCase):
"""Test case for futures.submit_to_loop."""
def setUp(self):
self.loop = self.new_test_loop(self.time_gen)
def time_gen(self):
"""Handle the timer."""
yield 0 # second
yield 1 # second
@asyncio.coroutine
def add(self, a, b, fail=False, cancel=False):
"""Wait 1 second and return a + b."""
yield from asyncio.sleep(1, loop=self.loop)
if fail:
raise RuntimeError("Fail!")
if cancel:
asyncio.tasks.Task.current_task(self.loop).cancel()
yield
return a + b
def target(self, fail=False, cancel=False, timeout=None):
"""Run add coroutine in the event loop."""
coro = self.add(1, 2, fail=fail, cancel=cancel)
future = asyncio.run_coroutine_threadsafe(coro, self.loop)
try:
return future.result(timeout)
finally:
future.done() or future.cancel()
def test_run_coroutine_threadsafe(self):
"""Test coroutine submission from a thread to an event loop."""
future = self.loop.run_in_executor(None, self.target)
result = self.loop.run_until_complete(future)
self.assertEqual(result, 3)
def test_run_coroutine_threadsafe_with_exception(self):
"""Test coroutine submission from a thread to an event loop
when an exception is raised."""
future = self.loop.run_in_executor(None, self.target, True)
with self.assertRaises(RuntimeError) as exc_context:
self.loop.run_until_complete(future)
self.assertIn("Fail!", exc_context.exception.args)
def test_run_coroutine_threadsafe_with_timeout(self):
"""Test coroutine submission from a thread to an event loop
when a timeout is raised."""
callback = lambda: self.target(timeout=0)
future = self.loop.run_in_executor(None, callback)
with self.assertRaises(asyncio.TimeoutError):
self.loop.run_until_complete(future)
# Clear the time generator and tasks
test_utils.run_briefly(self.loop)
# Check that there's no pending task (add has been cancelled)
for task in asyncio.Task.all_tasks(self.loop):
self.assertTrue(task.done())
def test_run_coroutine_threadsafe_task_cancelled(self):
"""Test coroutine submission from a tread to an event loop
when the task is cancelled."""
callback = lambda: self.target(cancel=True)
future = self.loop.run_in_executor(None, callback)
with self.assertRaises(asyncio.CancelledError):
self.loop.run_until_complete(future)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
...@@ -929,6 +929,7 @@ Steven Miale ...@@ -929,6 +929,7 @@ Steven Miale
Trent Mick Trent Mick
Jason Michalski Jason Michalski
Franck Michea Franck Michea
Vincent Michel
Tom Middleton Tom Middleton
Thomas Miedema Thomas Miedema
Stan Mihai Stan Mihai
......
...@@ -90,6 +90,10 @@ Core and Builtins ...@@ -90,6 +90,10 @@ Core and Builtins
Library Library
------- -------
- Issue #25304: Add asyncio.run_coroutine_threadsafe(). This lets you
submit a coroutine to a loop from another thread, returning a
concurrent.futures.Future. By Vincent Michel.
- Issue #25232: Fix CGIRequestHandler to split the query from the URL at the - Issue #25232: Fix CGIRequestHandler to split the query from the URL at the
first question mark (?) rather than the last. Patch from Xiang Zhang. first question mark (?) rather than the last. Patch from Xiang Zhang.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment