Commit 5df8f554 authored by Guido van Rossum's avatar Guido van Rossum

Issue #25233: Rewrite the guts of Queue to be more understandable and correct.

parent d63f262f
...@@ -47,7 +47,7 @@ class Queue: ...@@ -47,7 +47,7 @@ class Queue:
# Futures. # Futures.
self._getters = collections.deque() self._getters = collections.deque()
# Futures # Futures.
self._putters = collections.deque() self._putters = collections.deque()
self._unfinished_tasks = 0 self._unfinished_tasks = 0
self._finished = locks.Event(loop=self._loop) self._finished = locks.Event(loop=self._loop)
...@@ -67,10 +67,13 @@ class Queue: ...@@ -67,10 +67,13 @@ class Queue:
# End of the overridable methods. # End of the overridable methods.
def __put_internal(self, item): def _wakeup_next(self, waiters):
self._put(item) # Wake up the next waiter (if any) that isn't cancelled.
self._unfinished_tasks += 1 while waiters:
self._finished.clear() waiter = waiters.popleft()
if not waiter.done():
waiter.set_result(None)
break
def __repr__(self): def __repr__(self):
return '<{} at {:#x} {}>'.format( return '<{} at {:#x} {}>'.format(
...@@ -91,16 +94,6 @@ class Queue: ...@@ -91,16 +94,6 @@ class Queue:
result += ' tasks={}'.format(self._unfinished_tasks) result += ' tasks={}'.format(self._unfinished_tasks)
return result return result
def _consume_done_getters(self):
# Delete waiters at the head of the get() queue who've timed out.
while self._getters and self._getters[0].done():
self._getters.popleft()
def _consume_done_putters(self):
# Delete waiters at the head of the put() queue who've timed out.
while self._putters and self._putters[0].done():
self._putters.popleft()
def qsize(self): def qsize(self):
"""Number of items in the queue.""" """Number of items in the queue."""
return len(self._queue) return len(self._queue)
...@@ -134,47 +127,31 @@ class Queue: ...@@ -134,47 +127,31 @@ class Queue:
This method is a coroutine. This method is a coroutine.
""" """
self._consume_done_getters() while self.full():
if self._getters: putter = futures.Future(loop=self._loop)
assert not self._queue, ( self._putters.append(putter)
'queue non-empty, why are getters waiting?') try:
yield from putter
getter = self._getters.popleft() except:
self.__put_internal(item) putter.cancel() # Just in case putter is not done yet.
if not self.full() and not putter.cancelled():
# getter cannot be cancelled, we just removed done getters # We were woken up by get_nowait(), but can't take
getter.set_result(self._get()) # the call. Wake up the next in line.
self._wakeup_next(self._putters)
elif self._maxsize > 0 and self._maxsize <= self.qsize(): raise
waiter = futures.Future(loop=self._loop) return self.put_nowait(item)
self._putters.append(waiter)
yield from waiter
self._put(item)
else:
self.__put_internal(item)
def put_nowait(self, item): def put_nowait(self, item):
"""Put an item into the queue without blocking. """Put an item into the queue without blocking.
If no free slot is immediately available, raise QueueFull. If no free slot is immediately available, raise QueueFull.
""" """
self._consume_done_getters() if self.full():
if self._getters:
assert not self._queue, (
'queue non-empty, why are getters waiting?')
getter = self._getters.popleft()
self.__put_internal(item)
# getter cannot be cancelled, we just removed done getters
getter.set_result(self._get())
elif self._maxsize > 0 and self._maxsize <= self.qsize():
raise QueueFull raise QueueFull
else: self._put(item)
self.__put_internal(item) self._unfinished_tasks += 1
self._finished.clear()
self._wakeup_next(self._getters)
@coroutine @coroutine
def get(self): def get(self):
...@@ -184,77 +161,30 @@ class Queue: ...@@ -184,77 +161,30 @@ class Queue:
This method is a coroutine. This method is a coroutine.
""" """
self._consume_done_putters() while self.empty():
if self._putters: getter = futures.Future(loop=self._loop)
assert self.full(), 'queue not full, why are putters waiting?' self._getters.append(getter)
putter = self._putters.popleft()
# When a getter runs and frees up a slot so this putter can
# run, we need to defer the put for a tick to ensure that
# getters and putters alternate perfectly. See
# ChannelTest.test_wait.
self._loop.call_soon(putter._set_result_unless_cancelled, None)
return self._get()
elif self.qsize():
return self._get()
else:
waiter = futures.Future(loop=self._loop)
self._getters.append(waiter)
try: try:
return (yield from waiter) yield from getter
except futures.CancelledError: except:
# if we get CancelledError, it means someone cancelled this getter.cancel() # Just in case getter is not done yet.
# get() coroutine. But there is a chance that the waiter if not self.empty() and not getter.cancelled():
# already is ready and contains an item that has just been # We were woken up by put_nowait(), but can't take
# removed from the queue. In this case, we need to put the item # the call. Wake up the next in line.
# back into the front of the queue. This get() must either self._wakeup_next(self._getters)
# succeed without fault or, if it gets cancelled, it must be as
# if it never happened.
if waiter.done():
self._put_it_back(waiter.result())
raise raise
return self.get_nowait()
def _put_it_back(self, item):
"""
This is called when we have a waiter to get() an item and this waiter
gets cancelled. In this case, we put the item back: wake up another
waiter or put it in the _queue.
"""
self._consume_done_getters()
if self._getters:
assert not self._queue, (
'queue non-empty, why are getters waiting?')
getter = self._getters.popleft()
self.__put_internal(item)
# getter cannot be cancelled, we just removed done getters
getter.set_result(item)
else:
self._queue.appendleft(item)
def get_nowait(self): def get_nowait(self):
"""Remove and return an item from the queue. """Remove and return an item from the queue.
Return an item if one is immediately available, else raise QueueEmpty. Return an item if one is immediately available, else raise QueueEmpty.
""" """
self._consume_done_putters() if self.empty():
if self._putters:
assert self.full(), 'queue not full, why are putters waiting?'
putter = self._putters.popleft()
# Wake putter on next tick.
# getter cannot be cancelled, we just removed done putters
putter.set_result(None)
return self._get()
elif self.qsize():
return self._get()
else:
raise QueueEmpty raise QueueEmpty
item = self._get()
self._wakeup_next(self._putters)
return item
def task_done(self): def task_done(self):
"""Indicate that a formerly enqueued task is complete. """Indicate that a formerly enqueued task is complete.
......
...@@ -271,6 +271,29 @@ class QueueGetTests(_QueueTestBase): ...@@ -271,6 +271,29 @@ class QueueGetTests(_QueueTestBase):
self.assertEqual(self.loop.run_until_complete(q.get()), 'a') self.assertEqual(self.loop.run_until_complete(q.get()), 'a')
self.assertEqual(self.loop.run_until_complete(q.get()), 'b') self.assertEqual(self.loop.run_until_complete(q.get()), 'b')
def test_why_are_getters_waiting(self):
# From issue #268.
@asyncio.coroutine
def consumer(queue, num_expected):
for _ in range(num_expected):
yield from queue.get()
@asyncio.coroutine
def producer(queue, num_items):
for i in range(num_items):
yield from queue.put(i)
queue_size = 1
producer_num_items = 5
q = asyncio.Queue(queue_size, loop=self.loop)
self.loop.run_until_complete(
asyncio.gather(producer(q, producer_num_items),
consumer(q, producer_num_items),
loop=self.loop),
)
class QueuePutTests(_QueueTestBase): class QueuePutTests(_QueueTestBase):
...@@ -377,13 +400,8 @@ class QueuePutTests(_QueueTestBase): ...@@ -377,13 +400,8 @@ class QueuePutTests(_QueueTestBase):
loop.run_until_complete(reader3) loop.run_until_complete(reader3)
# reader2 will receive `2`, because it was added to the # It is undefined in which order concurrent readers receive results.
# queue of pending readers *before* put_nowaits were called. self.assertEqual({reader2.result(), reader3.result()}, {1, 2})
self.assertEqual(reader2.result(), 2)
# reader3 will receive `1`, because reader1 was cancelled
# before is had a chance to execute, and `2` was already
# pushed to reader2 by second `put_nowait`.
self.assertEqual(reader3.result(), 1)
def test_put_cancel_drop(self): def test_put_cancel_drop(self):
...@@ -479,6 +497,29 @@ class QueuePutTests(_QueueTestBase): ...@@ -479,6 +497,29 @@ class QueuePutTests(_QueueTestBase):
self.loop.run_until_complete(q.put('a')) self.loop.run_until_complete(q.put('a'))
self.assertEqual(self.loop.run_until_complete(t), 'a') self.assertEqual(self.loop.run_until_complete(t), 'a')
def test_why_are_putters_waiting(self):
# From issue #265.
queue = asyncio.Queue(2, loop=self.loop)
@asyncio.coroutine
def putter(item):
yield from queue.put(item)
@asyncio.coroutine
def getter():
yield
num = queue.qsize()
for _ in range(num):
item = queue.get_nowait()
t0 = putter(0)
t1 = putter(1)
t2 = putter(2)
t3 = putter(3)
self.loop.run_until_complete(
asyncio.gather(getter(), t0, t1, t2, t3, loop=self.loop))
class LifoQueueTests(_QueueTestBase): class LifoQueueTests(_QueueTestBase):
......
...@@ -81,6 +81,8 @@ Core and Builtins ...@@ -81,6 +81,8 @@ Core and Builtins
Library Library
------- -------
- Issue #25233: Rewrite the guts of Queue to be more understandable and correct.
- Issue #23600: Default implementation of tzinfo.fromutc() was returning - Issue #23600: Default implementation of tzinfo.fromutc() was returning
wrong results in some cases. wrong results in some cases.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment