Commit 772ff231 authored by Denis Bilenko's avatar Denis Bilenko

proc: add joinall; fix memleak in waitall; make killall use links

- also use queue.Queue instead of coros.Queue
parent 80fdf951
...@@ -77,6 +77,7 @@ and wait for all them to complete. Such function is provided by this module. ...@@ -77,6 +77,7 @@ and wait for all them to complete. Such function is provided by this module.
""" """
import sys import sys
from gevent import coros, greenlet, core from gevent import coros, greenlet, core
from gevent.queue import Queue
import traceback import traceback
__all__ = ['LinkedExited', __all__ = ['LinkedExited',
...@@ -171,47 +172,42 @@ class LinkToCallable(Link): ...@@ -171,47 +172,42 @@ class LinkToCallable(Link):
self.listener(source) self.listener(source)
def waitall(lst, trap_errors=False, queue=None): # QQQ add timeout
def joinall(sources, trap_errors=True, queue=None):
if queue is None: if queue is None:
queue = coros.Queue() queue = Queue()
index = -1 links = []
for (index, linkable) in enumerate(lst): try:
linkable.link(decorate_send(queue, index)) for source in sources:
len = index + 1 links.append(source.link(queue.put))
results = [None] * len for _ in xrange(len(sources)):
count = 0 completed = queue.get()
while count < len: if not trap_errors and completed.has_exception():
try: greenlet.getcurrent().throw(*completed.exc_info())
index, value = queue.wait() finally:
except Exception: for link in links:
if not trap_errors: link.cancel()
raise
else:
results[index] = value def waitall(sources, trap_errors=False, queue=None):
count += 1 joinall(sources, trap_errors=trap_errors, queue=queue)
return results return [source.value for source in sources]
class decorate_send(object): def killall(sources, exception=ProcExit, block=False, polling_period=0.2):
waiter = Waiter()
def __init__(self, event, tag): core.active_event(greenlet._killall, sources, exception, waiter)
self._event = event if block:
self._tag = tag alive = waiter.wait()
if alive:
def __repr__(self): try:
params = (type(self).__name__, self._tag, self._event) joinall(alive, trap_errors=True)
return '<%s tag=%r event=%r>' % params except TypeError:
greenlet._joinall(alive, polling_period=polling_period)
def __getattr__(self, name): # QQQ a) use links for all the greenlets we can and poll the others
assert name != '_event' # QQQ b) have only one unversal version of killall, waitall, joinall etc
return getattr(self._event, name) # QQQ the current dichotomy of greenlets and procs is confusing
def send(self, value=None):
self._event.send((self._tag, value))
# XXX this killall should actually be aware of link() and use it to avoid polling
killall = greenlet.killall
class NotUsed(object): class NotUsed(object):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment