Commit e1b9d834 authored by Denis Bilenko's avatar Denis Bilenko

BACKWARD INCOMPATIBLE: Queue(0) is now the same as Queue(None). This...

BACKWARD INCOMPATIBLE: Queue(0) is now the same as Queue(None). This simplifies implementation and removes the difference between
gevent's Queue and queues from Queue and multiprocessing. Use Channel instead of Queue(0).
parent 96a6b569
......@@ -5,16 +5,9 @@ The :mod:`gevent.queue` module implements multi-producer, multi-consumer queues
that work across greenlets, with the API similar to the classes found in the
standard :mod:`Queue` and :class:`multiprocessing <multiprocessing.Queue>` modules.
A major difference is that queues in this module operate as channels when
initialized with *maxsize* of zero. In such case, both :meth:`Queue.empty`
and :meth:`Queue.full` return ``True`` and :meth:`Queue.put` always blocks until a call
to :meth:`Queue.get` retrieves the item.
Changed in version 1.0: Queue(0) now means queue of infinite size, not a channel.
Another interesting difference is that :meth:`Queue.qsize`, :meth:`Queue.empty`, and
:meth:`Queue.full` *can* be used as indicators of whether the subsequent :meth:`Queue.get`
or :meth:`Queue.put` will not block.
Additionally, queues in this module implement iterator protocol. Iterating over queue
The classes in this module implement iterator protocol. Iterating over queue
means repeatedly calling :meth:`get <Queue.get>` until :meth:`get <Queue.get>` returns ``StopIteration``.
>>> queue = gevent.queue.Queue()
......@@ -39,7 +32,7 @@ except ImportError:
Empty = __queue__.Empty
from gevent.timeout import Timeout
from gevent.hub import get_hub, Waiter, getcurrent, _NONE
from gevent.hub import get_hub, Waiter, getcurrent
__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'JoinableQueue', 'Channel']
......@@ -56,8 +49,12 @@ class Queue(object):
"""
def __init__(self, maxsize=None):
if maxsize < 0:
if maxsize <= 0:
self.maxsize = None
if maxsize == 0:
import warnings
warnings.warn('Queue(0) now equivalent to Queue(None); if you want a channel, use Channel',
DeprecationWarning, stacklevel=2)
else:
self.maxsize = maxsize
self.getters = set()
......@@ -106,7 +103,7 @@ class Queue(object):
``Queue(None)`` is never full.
"""
return self.qsize() >= self.maxsize
return self.maxsize is not None and self.qsize() >= self.maxsize
def put(self, item, block=True, timeout=None):
"""Put an item into the queue.
......@@ -124,19 +121,18 @@ class Queue(object):
self._put(item)
if self.getters:
self._schedule_unlock()
elif not block and self.hub is getcurrent():
# we're in the mainloop, so we cannot wait; we can switch() to other greenlets though
# find a getter and deliver an item to it
while self.getters:
elif self.hub is getcurrent():
# We're in the mainloop, so we cannot wait; we can switch() to other greenlets though.
# Check if possible to get a free slot in the queue.
while self.getters and self.qsize() and self.qsize() >= self.maxsize:
getter = self.getters.pop()
if getter:
getter.switch(getter)
if self.qsize() < self.maxsize:
self._put(item)
item = self._get()
getter.switch(item)
return
raise Full
elif block:
waiter = ItemWaiter(item)
waiter = ItemWaiter(item, self)
self.putters.add(waiter)
timeout = Timeout.start_new(timeout, Full)
try:
......@@ -144,8 +140,6 @@ class Queue(object):
self._schedule_unlock()
result = waiter.get()
assert result is waiter, "Invalid switch into Queue.put: %r" % (result, )
if waiter.item is not _NONE:
self._put(item)
finally:
timeout.cancel()
self.putters.discard(waiter)
......@@ -174,13 +168,11 @@ class Queue(object):
if self.putters:
self._schedule_unlock()
return self._get()
elif not block and self.hub is getcurrent():
elif self.hub is getcurrent():
# special case to make get_nowait() runnable in the mainloop greenlet
# there are no items in the queue; try to fix the situation by unlocking putters
while self.putters:
putter = self.putters.pop()
if putter:
putter.switch(putter)
self.putters.pop().put_and_switch()
if self.qsize():
return self._get()
raise Empty
......@@ -191,7 +183,9 @@ class Queue(object):
self.getters.add(waiter)
if self.putters:
self._schedule_unlock()
return waiter.get()
result = waiter.get()
assert result is waiter, 'Invalid switch into Queue.get: %r' % (result, )
return self._get()
finally:
self.getters.discard(waiter)
timeout.cancel()
......@@ -208,35 +202,26 @@ class Queue(object):
def _unlock(self):
while True:
if self.qsize() and self.getters:
getter = self.getters.pop()
if getter:
repeat = False
if self.putters and (self.maxsize is None or self.qsize() < self.maxsize):
repeat = True
try:
item = self._get()
except:
getter.throw(*sys.exc_info())
else:
getter.switch(item)
elif self.putters and self.getters:
putter = self.putters.pop()
if putter:
getter = self.getters.pop()
if getter:
item = putter.item
putter.item = _NONE # this makes greenlet calling put() not to call _put() again
self._put(item)
item = self._get()
getter.switch(item)
putter.switch(putter)
self._put(putter.item)
except:
putter.throw(*sys.exc_info())
else:
self.putters.add(putter)
elif self.putters and (self.getters or self.qsize() < self.maxsize):
putter = self.putters.pop()
putter.switch(putter)
else:
break
if self.getters and self.qsize():
repeat = True
getter = self.getters.pop()
getter.switch(getter)
if not repeat:
return
# testcase: 2 greenlets: while True: q.put(q.get()) - nothing else has a change to execute
# to avoid this, schedule unlock with timer(0, ...) once in a while
# replace 'while True' with 'for _ in xrange(100): ...; self._timer.start(self._unlock)
# but then I need _timer and self._event_unlock to play with each other
def _schedule_unlock(self):
self._event_unlock.start(self._unlock)
......@@ -252,11 +237,18 @@ class Queue(object):
class ItemWaiter(Waiter):
__slots__ = ['item']
__slots__ = ['item', 'queue']
def __init__(self, item):
def __init__(self, item, queue):
Waiter.__init__(self)
self.item = item
self.queue = queue
def put_and_switch(self):
self.queue._put(self.item)
self.queue = None
self.item = None
return self.switch(self)
class PriorityQueue(Queue):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment