Commit bcdc2ba7 authored by Victor Stinner's avatar Victor Stinner

asyncio, Tulip issue 201: Fix a race condition in wait_for()

Don't raise a TimeoutError if we reached the timeout and the future completed
in the same iteration of the event loop. A side effect of the bug is that
Queue.get() looses items.
parent 65c70519
...@@ -330,9 +330,9 @@ def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED): ...@@ -330,9 +330,9 @@ def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
return (yield from _wait(fs, timeout, return_when, loop)) return (yield from _wait(fs, timeout, return_when, loop))
def _release_waiter(waiter, value=True, *args): def _release_waiter(waiter, *args):
if not waiter.done(): if not waiter.done():
waiter.set_result(value) waiter.set_result(None)
@coroutine @coroutine
...@@ -357,14 +357,17 @@ def wait_for(fut, timeout, *, loop=None): ...@@ -357,14 +357,17 @@ def wait_for(fut, timeout, *, loop=None):
return (yield from fut) return (yield from fut)
waiter = futures.Future(loop=loop) waiter = futures.Future(loop=loop)
timeout_handle = loop.call_later(timeout, _release_waiter, waiter, False) timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
cb = functools.partial(_release_waiter, waiter, True) cb = functools.partial(_release_waiter, waiter)
fut = async(fut, loop=loop) fut = async(fut, loop=loop)
fut.add_done_callback(cb) fut.add_done_callback(cb)
try: try:
if (yield from waiter): # wait until the future completes or the timeout
yield from waiter
if fut.done():
return fut.result() return fut.result()
else: else:
fut.remove_done_callback(cb) fut.remove_done_callback(cb)
...@@ -397,7 +400,7 @@ def _wait(fs, timeout, return_when, loop): ...@@ -397,7 +400,7 @@ def _wait(fs, timeout, return_when, loop):
if timeout_handle is not None: if timeout_handle is not None:
timeout_handle.cancel() timeout_handle.cancel()
if not waiter.done(): if not waiter.done():
waiter.set_result(False) waiter.set_result(None)
for f in fs: for f in fs:
f.add_done_callback(_on_completion) f.add_done_callback(_on_completion)
......
...@@ -552,6 +552,21 @@ class TaskTests(test_utils.TestCase): ...@@ -552,6 +552,21 @@ class TaskTests(test_utils.TestCase):
self.assertTrue(fut.done()) self.assertTrue(fut.done())
self.assertTrue(fut.cancelled()) self.assertTrue(fut.cancelled())
def test_wait_for_race_condition(self):
def gen():
yield 0.1
yield 0.1
yield 0.1
loop = self.new_test_loop(gen)
fut = asyncio.Future(loop=loop)
task = asyncio.wait_for(fut, timeout=0.2, loop=loop)
loop.call_later(0.1, fut.set_result, "ok")
res = loop.run_until_complete(task)
self.assertEqual(res, "ok")
def test_wait(self): def test_wait(self):
def gen(): def gen():
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment