Commit ea62d6c6 authored by Jason Madden's avatar Jason Madden

Unify and simplify the implementation of Queue.get and Queue.put. Fixes #647...

Unify and simplify the implementation of Queue.get and Queue.put. Fixes #647 (add a test case for that scenario).
parent 2c858dcd
......@@ -7,7 +7,8 @@
1.1b5 (unreleased)
==================
- Nothing yet
- Fix a possible ``ValueError`` from ``gevent.queue.Queue:peek``.
Reported in :issue:`647` by Kevin Chen.
1.1b4 (Sep 4, 2015)
===================
......
......@@ -41,6 +41,13 @@ from gevent.hub import InvalidSwitchError
__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'JoinableQueue', 'Channel']
def _safe_remove(deq, item):
# For when the item may have been removed by
# Queue._unlock
try:
deq.remove(item)
except ValueError:
pass
class Queue(object):
"""
......@@ -208,10 +215,7 @@ class Queue(object):
finally:
if timeout is not None:
timeout.cancel()
try:
self.putters.remove(waiter)
except ValueError:
pass # removed by unlock
_safe_remove(self.putters, waiter)
else:
raise Full
......@@ -223,6 +227,43 @@ class Queue(object):
"""
self.put(item, False)
def __get_or_peek(self, method, block, timeout):
# Internal helper method. The `method` should be either
# self._get when called from self.get() or self._peek when
# called from self.peek(). Call this after the initial check
# to see if there are items in the queue.
if self.hub is getcurrent():
# special case to make get_nowait() or peek_nowait() runnable in the mainloop greenlet
# there are no items in the queue; try to fix the situation by unlocking putters
while self.putters:
# Note: get() used popleft(), peek used pop(); popleft
# is almost certainly correct.
self.putters.popleft().put_and_switch()
if self.qsize():
return method()
raise Empty()
if not block:
# We can't block, we're not the hub, and we have nothing
# to return. No choice...
raise Empty()
waiter = Waiter()
timeout = Timeout.start_new(timeout, Empty) if timeout is not None else None
try:
self.getters.append(waiter)
if self.putters:
self._schedule_unlock()
result = waiter.get()
if result is not waiter:
raise InvalidSwitchError('Invalid switch into Queue.get: %r' % (result, ))
return method()
finally:
if timeout is not None:
timeout.cancel()
_safe_remove(self.getters, waiter)
def get(self, block=True, timeout=None):
"""Remove and return an item from the queue.
......@@ -237,34 +278,8 @@ class Queue(object):
if self.putters:
self._schedule_unlock()
return self._get()
elif self.hub is getcurrent():
# special case to make get_nowait() runnable in the mainloop greenlet
# there are no items in the queue; try to fix the situation by unlocking putters
while self.putters:
self.putters.popleft().put_and_switch()
if self.qsize():
return self._get()
raise Empty
elif block:
waiter = Waiter()
timeout = Timeout.start_new(timeout, Empty) if timeout is not None else None
try:
self.getters.append(waiter)
if self.putters:
self._schedule_unlock()
result = waiter.get()
if result is not waiter:
raise InvalidSwitchError('Invalid switch into Queue.get: %r' % (result, ))
return self._get()
finally:
if timeout is not None:
timeout.cancel()
try:
self.getters.remove(waiter)
except ValueError:
pass # Removed by _unlock
else:
raise Empty
return self.__get_or_peek(self._get, block, timeout)
def get_nowait(self):
"""Remove and return an item from the queue without blocking.
......@@ -285,33 +300,17 @@ class Queue(object):
(*timeout* is ignored in that case).
"""
if self.qsize():
# XXX: Why doesn't this schedule an unlock like get() does?
return self._peek()
elif self.hub is getcurrent():
# special case to make peek(False) runnable in the mainloop greenlet
# there are no items in the queue; try to fix the situation by unlocking putters
while self.putters:
self.putters.pop().put_and_switch()
if self.qsize():
return self._peek()
raise Empty
elif block:
waiter = Waiter()
timeout = Timeout.start_new(timeout, Empty)
try:
self.getters.append(waiter)
if self.putters:
self._schedule_unlock()
result = waiter.get()
if result is not waiter:
raise InvalidSwitchError('Invalid switch into Queue.peek: %r' % (result, ))
return self._peek()
finally:
self.getters.remove(waiter)
timeout.cancel()
else:
raise Empty
return self.__get_or_peek(self._peek, block, timeout)
def peek_nowait(self):
"""Return an item from the queue without blocking.
Only return an item if one is immediately available. Otherwise
raise the :class:`Empty` exception.
"""
return self.peek(False)
def _unlock(self):
......@@ -519,7 +518,7 @@ class Channel(object):
waiter = Waiter()
item = (item, waiter)
self.putters.append(item)
timeout = Timeout.start_new(timeout, Full)
timeout = Timeout.start_new(timeout, Full) if timeout is not None else None
try:
if self.getters:
self._schedule_unlock()
......@@ -527,16 +526,11 @@ class Channel(object):
if result is not waiter:
raise InvalidSwitchError("Invalid switch into Channel.put: %r" % (result, ))
except:
self._discard(item)
_safe_remove(self.putters, item)
raise
finally:
timeout.cancel()
def _discard(self, item):
try:
self.putters.remove(item)
except ValueError:
pass
if timeout is not None:
timeout.cancel()
def put_nowait(self, item):
self.put(item, False)
......
......@@ -13,8 +13,8 @@ class TestQueue(TestCase):
self.switch_expected = False
q = queue.Queue()
q.put('hi')
self.assertEquals(q.peek(), 'hi')
self.assertEquals(q.get(), 'hi')
self.assertEqual(q.peek(), 'hi')
self.assertEqual(q.get(), 'hi')
def test_peek_empty(self):
q = queue.Queue()
......@@ -27,12 +27,22 @@ class TestQueue(TestCase):
gevent.sleep(0.1)
g.join()
def test_peek_multi_greenlet(self):
q = queue.Queue()
g = gevent.spawn(q.peek)
g.start()
gevent.sleep(0)
q.put(1)
g.join()
self.assertTrue(g.exception is None)
self.assertEqual(q.peek(), 1)
def test_send_last(self):
q = queue.Queue()
def waiter(q):
with gevent.Timeout(0.1):
self.assertEquals(q.get(), 'hi2')
self.assertEqual(q.get(), 'hi2')
return "OK"
p = gevent.spawn(waiter, q)
......@@ -56,12 +66,12 @@ class TestQueue(TestCase):
p = gevent.spawn(putter, q)
gevent.sleep(0)
self.assertEquals(results, ['a', 'b'])
self.assertEquals(q.get(), 'a')
self.assertEqual(results, ['a', 'b'])
self.assertEqual(q.get(), 'a')
gevent.sleep(0)
self.assertEquals(results, ['a', 'b', 'c'])
self.assertEquals(q.get(), 'b')
self.assertEquals(q.get(), 'c')
self.assertEqual(results, ['a', 'b', 'c'])
self.assertEqual(q.get(), 'b')
self.assertEqual(q.get(), 'c')
assert p.get(timeout=0) == "OK"
def test_zero_max_size(self):
......@@ -82,8 +92,8 @@ class TestQueue(TestCase):
gevent.sleep(0.001)
self.assertTrue(not e1.ready())
p2 = gevent.spawn(receiver, e2, q)
self.assertEquals(e2.get(), 'hi')
self.assertEquals(e1.get(), 'done')
self.assertEqual(e2.get(), 'hi')
self.assertEqual(e1.get(), 'done')
with gevent.Timeout(0):
gevent.joinall([p1, p2])
......@@ -111,12 +121,12 @@ class TestQueue(TestCase):
return len(results)
q.put(sendings[0])
self.assertEquals(collect_pending_results(), 1)
self.assertEqual(collect_pending_results(), 1)
q.put(sendings[1])
self.assertEquals(collect_pending_results(), 2)
self.assertEqual(collect_pending_results(), 2)
q.put(sendings[2])
q.put(sendings[3])
self.assertEquals(collect_pending_results(), 4)
self.assertEqual(collect_pending_results(), 4)
def test_waiters_that_cancel(self):
q = queue.Queue()
......@@ -131,10 +141,10 @@ class TestQueue(TestCase):
evt = AsyncResult()
gevent.spawn(do_receive, q, evt)
self.assertEquals(evt.get(), 'timed out')
self.assertEqual(evt.get(), 'timed out')
q.put('hi')
self.assertEquals(q.get(), 'hi')
self.assertEqual(q.get(), 'hi')
def test_senders_that_die(self):
q = queue.Queue()
......@@ -143,7 +153,7 @@ class TestQueue(TestCase):
q.put('sent')
gevent.spawn(do_send, q)
self.assertEquals(q.get(), 'sent')
self.assertEqual(q.get(), 'sent')
def test_two_waiters_one_dies(self):
......@@ -165,8 +175,8 @@ class TestQueue(TestCase):
gevent.spawn(waiter, q, waiting_evt)
gevent.sleep(0.1)
q.put('hi')
self.assertEquals(dying_evt.get(), 'timed out')
self.assertEquals(waiting_evt.get(), 'hi')
self.assertEqual(dying_evt.get(), 'timed out')
self.assertEqual(waiting_evt.get(), 'hi')
def test_two_bogus_waiters(self):
def do_receive(q, evt):
......@@ -184,9 +194,9 @@ class TestQueue(TestCase):
gevent.spawn(do_receive, q, e2)
gevent.sleep(0.1)
q.put('sent')
self.assertEquals(e1.get(), 'timed out')
self.assertEquals(e2.get(), 'timed out')
self.assertEquals(q.get(), 'sent')
self.assertEqual(e1.get(), 'timed out')
self.assertEqual(e2.get(), 'timed out')
self.assertEqual(q.get(), 'sent')
class TestChannel(TestCase):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment