Commit ebf4a1d6 authored by Denis Bilenko's avatar Denis Bilenko

add gevent.wait(), gevent.iwait()

- remove waitall() from gevent/event.py
- reimplement joinall() with wait()
- remove joinall from __all__
- Hub.join() no longer accepts event= argument
parent 3667d16f
...@@ -18,7 +18,8 @@ __all__ = ['get_hub', ...@@ -18,7 +18,8 @@ __all__ = ['get_hub',
'spawn', 'spawn',
'spawn_later', 'spawn_later',
'spawn_raw', 'spawn_raw',
'joinall', 'iwait',
'wait',
'killall', 'killall',
'Timeout', 'Timeout',
'with_timeout', 'with_timeout',
...@@ -28,8 +29,7 @@ __all__ = ['get_hub', ...@@ -28,8 +29,7 @@ __all__ = ['get_hub',
'kill', 'kill',
'signal', 'signal',
'fork', 'fork',
'reinit', 'reinit']
'wait']
import sys import sys
...@@ -39,7 +39,7 @@ if sys.platform == 'win32': ...@@ -39,7 +39,7 @@ if sys.platform == 'win32':
del sys del sys
from gevent.hub import get_hub from gevent.hub import get_hub, iwait, wait
from gevent.greenlet import Greenlet, joinall, killall from gevent.greenlet import Greenlet, joinall, killall
spawn = Greenlet.spawn spawn = Greenlet.spawn
spawn_later = Greenlet.spawn_later spawn_later = Greenlet.spawn_later
...@@ -50,9 +50,4 @@ try: ...@@ -50,9 +50,4 @@ try:
except ImportError: except ImportError:
__all__.remove('fork') __all__.remove('fork')
def wait(timeout=None, event=None):
return get_hub().join(timeout=timeout, event=event)
run = wait # XXX to be deleted (soon) run = wait # XXX to be deleted (soon)
...@@ -310,18 +310,3 @@ class AsyncResult(object): ...@@ -310,18 +310,3 @@ class AsyncResult(object):
self.set(source.value) self.set(source.value)
else: else:
self.set_exception(source.exception) self.set_exception(source.exception)
def waitall(events):
# QQQ add timeout?
from gevent.queue import Queue
queue = Queue()
put = queue.put
try:
for event in events:
event.rawlink(put)
for _ in xrange(len(events)):
queue.get()
finally:
for event in events:
event.unlink(put)
# Copyright (c) 2009-2012 Denis Bilenko. See LICENSE for details. # Copyright (c) 2009-2012 Denis Bilenko. See LICENSE for details.
import sys import sys
from gevent.hub import greenlet, getcurrent, get_hub, GreenletExit, Waiter, PY3 from gevent.hub import greenlet, getcurrent, get_hub, GreenletExit, Waiter, PY3, iwait, wait
from gevent.timeout import Timeout from gevent.timeout import Timeout
from collections import deque from collections import deque
...@@ -396,39 +396,17 @@ def _kill(greenlet, exception, waiter): ...@@ -396,39 +396,17 @@ def _kill(greenlet, exception, waiter):
waiter.switch() waiter.switch()
try:
xrange
except NameError:
xrange = range
def joinall(greenlets, timeout=None, raise_error=False, count=None): def joinall(greenlets, timeout=None, raise_error=False, count=None):
from gevent.queue import Queue if not raise_error:
queue = Queue() wait(greenlets, timeout=timeout)
put = queue.put else:
if count is None: for obj in iwait(greenlets, timeout=timeout):
count = len(greenlets) if getattr(obj, 'exception', None) is not None:
timeout = Timeout.start_new(timeout) raise obj.exception
try: if count is not None:
try: count -= 1
for greenlet in greenlets: if count <= 0:
greenlet.rawlink(put) break
if raise_error:
for _ in xrange(count):
greenlet = queue.get()
if not greenlet.successful():
raise greenlet.exception
else:
for _ in xrange(count):
queue.get()
except:
if sys.exc_info()[1] is not timeout:
raise
finally:
for greenlet in greenlets:
greenlet.unlink(put)
finally:
timeout.cancel()
def _killall3(greenlets, exception, waiter): def _killall3(greenlets, exception, waiter):
......
...@@ -408,15 +408,14 @@ class Hub(greenlet): ...@@ -408,15 +408,14 @@ class Hub(greenlet):
# It is still possible to kill this greenlet with throw. However, in that case # It is still possible to kill this greenlet with throw. However, in that case
# switching to it is no longer safe, as switch will return immediatelly # switching to it is no longer safe, as switch will return immediatelly
def join(self, timeout=None, event=None): def join(self, timeout=None):
"""Wait for the event loop to finish. Exits only when there are """Wait for the event loop to finish. Exits only when there are
no more spawned greenlets, started servers, active timeouts or watchers. no more spawned greenlets, started servers, active timeouts or watchers.
If *timeout* is provided, wait no longer for the specified number of seconds. If *timeout* is provided, wait no longer for the specified number of seconds.
If *event* was provided, exit when it was signalled with :meth:`Event.set` method.
Returns True if exited because the loop finished execution. Returns True if exited because the loop finished execution.
Returns False if exited because of timeout expired or event was signalled. Returns False if exited because of timeout expired.
""" """
assert getcurrent() is self.parent, "only possible from the MAIN greenlet" assert getcurrent() is self.parent, "only possible from the MAIN greenlet"
if self.dead: if self.dead:
...@@ -424,26 +423,18 @@ class Hub(greenlet): ...@@ -424,26 +423,18 @@ class Hub(greenlet):
waiter = Waiter() waiter = Waiter()
if event is not None: if timeout is not None:
switch = waiter.switch timeout = self.loop.timer(timeout, ref=False)
event.rawlink(switch) timeout.start(waiter.switch)
try: try:
if timeout is not None:
timeout = self.loop.timer(timeout, ref=False)
timeout.start(waiter.switch)
try: try:
try: waiter.get()
waiter.get() except LoopExit:
except LoopExit: return True
return True
finally:
if timeout is not None:
timeout.stop()
finally: finally:
if event is not None: if timeout is not None:
event.unlink(switch) timeout.stop()
return False return False
def destroy(self, destroy_loop=None): def destroy(self, destroy_loop=None):
...@@ -622,6 +613,41 @@ class Waiter(object): ...@@ -622,6 +613,41 @@ class Waiter(object):
# and unwraps it in wait() thus checking that switch() was indeed called # and unwraps it in wait() thus checking that switch() was indeed called
def iwait(objects, timeout=None):
# QQQ would be nice to support iterable here that can be generated slowly (why?)
waiter = Waiter()
switch = waiter.switch
if timeout is not None:
get_hub().loop.timer(timeout, priority=-1, ref=False).start(waiter.switch, _NONE)
try:
count = len(objects)
for obj in objects:
obj.rawlink(switch)
for _ in xrange(count):
item = waiter.get()
waiter.clear()
if item is _NONE:
return
yield item
finally:
for obj in objects:
obj.unlink(switch)
def wait(objects=None, timeout=None, count=None):
if objects is None:
return get_hub().join(timeout=timeout)
result = []
if count is None:
return list(iwait(objects, timeout))
for obj in iwait(objects=objects, timeout=timeout):
result.append(obj)
count -= 1
if count <= 0:
break
return result
class _NONE(object): class _NONE(object):
"A special thingy you must never pass to any of gevent API" "A special thingy you must never pass to any of gevent API"
__slots__ = [] __slots__ = []
......
...@@ -32,7 +32,7 @@ for _a in xrange(2): ...@@ -32,7 +32,7 @@ for _a in xrange(2):
for _ in xrange(2): for _ in xrange(2):
x = gevent.spawn(lambda: 5) x = gevent.spawn(lambda: 5)
with no_time(SMALL): with no_time(SMALL):
result = gevent.get_hub().join(timeout=10) result = gevent.wait(timeout=10)
assert result is True, repr(result) assert result is True, repr(result)
assert x.dead, x assert x.dead, x
assert x.value == 5, x assert x.value == 5, x
...@@ -41,7 +41,7 @@ for _a in xrange(2): ...@@ -41,7 +41,7 @@ for _a in xrange(2):
for _ in xrange(2): for _ in xrange(2):
x = gevent.spawn_later(SMALL, lambda: 5) x = gevent.spawn_later(SMALL, lambda: 5)
with expected_time(SMALL): with expected_time(SMALL):
result = gevent.get_hub().join(timeout=10) result = gevent.wait(timeout=10)
assert result is True, repr(result) assert result is True, repr(result)
assert x.dead, x assert x.dead, x
...@@ -49,12 +49,12 @@ for _a in xrange(2): ...@@ -49,12 +49,12 @@ for _a in xrange(2):
for _ in xrange(2): for _ in xrange(2):
x = gevent.spawn_later(10, lambda: 5) x = gevent.spawn_later(10, lambda: 5)
with expected_time(SMALL): with expected_time(SMALL):
result = gevent.get_hub().join(timeout=SMALL) result = gevent.wait(timeout=SMALL)
assert result is False, repr(result) assert result is False, repr(result)
assert not x.dead, x assert not x.dead, x
x.kill() x.kill()
with no_time(): with no_time():
result = gevent.get_hub().join() result = gevent.wait()
assert result is True assert result is True
# exiting because of event (the spawned greenlet still runs) # exiting because of event (the spawned greenlet still runs)
...@@ -63,21 +63,21 @@ for _a in xrange(2): ...@@ -63,21 +63,21 @@ for _a in xrange(2):
event = Event() event = Event()
event_set = gevent.spawn_later(SMALL, event.set) event_set = gevent.spawn_later(SMALL, event.set)
with expected_time(SMALL): with expected_time(SMALL):
result = gevent.get_hub().join(event=event) result = gevent.wait([event])
assert result is False, repr(result) assert result == [event], repr(result)
assert not x.dead, x assert not x.dead, x
assert event_set.dead assert event_set.dead
assert event.is_set() assert event.is_set()
x.kill() x.kill()
with no_time(): with no_time():
result = gevent.get_hub().join() result = gevent.wait()
assert result is True assert result is True
# checking "ref=False" argument # checking "ref=False" argument
for _ in xrange(2): for _ in xrange(2):
gevent.get_hub().loop.timer(10, ref=False).start(lambda: None) gevent.get_hub().loop.timer(10, ref=False).start(lambda: None)
with no_time(): with no_time():
result = gevent.get_hub().join() result = gevent.wait()
assert result is True assert result is True
# checking "ref=False" attribute # checking "ref=False" attribute
...@@ -86,5 +86,5 @@ for _a in xrange(2): ...@@ -86,5 +86,5 @@ for _a in xrange(2):
w.start(lambda: None) w.start(lambda: None)
w.ref = False w.ref = False
with no_time(): with no_time():
result = gevent.get_hub().join() result = gevent.wait()
assert result is True assert result is True
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment