Commit 819169f1 authored by Denis Bilenko's avatar Denis Bilenko

Pool: make map/imap/imap_unordered raise the error if iterable raises an error

parent 5abb82fd
...@@ -211,9 +211,16 @@ class IMapUnordered(Greenlet): ...@@ -211,9 +211,16 @@ class IMapUnordered(Greenlet):
self.iterable = iterable self.iterable = iterable
self.queue = Queue() self.queue = Queue()
self.count = 0 self.count = 0
self.rawlink(self._on_finish)
def __iter__(self): def __iter__(self):
return self.queue return self
def next(self):
value = self.queue.get()
if isinstance(value, Failure):
raise value.exc
return value
def _run(self): def _run(self):
try: try:
...@@ -231,7 +238,11 @@ class IMapUnordered(Greenlet): ...@@ -231,7 +238,11 @@ class IMapUnordered(Greenlet):
if greenlet.successful(): if greenlet.successful():
self.queue.put(greenlet.value) self.queue.put(greenlet.value)
if self.ready() and self.count <= 0: if self.ready() and self.count <= 0:
self.queue.put(StopIteration) self.queue.put(Failure(StopIteration))
def _on_finish(self, _self):
if not self.successful():
self.queue.put(Failure(self.exception))
class IMap(Greenlet): class IMap(Greenlet):
...@@ -248,6 +259,7 @@ class IMap(Greenlet): ...@@ -248,6 +259,7 @@ class IMap(Greenlet):
self.waiting = [] # QQQ maybe deque will work faster there? self.waiting = [] # QQQ maybe deque will work faster there?
self.index = 0 self.index = 0
self.maxindex = -1 self.maxindex = -1
self.rawlink(self._on_finish)
def __iter__(self): def __iter__(self):
return self return self
...@@ -262,9 +274,8 @@ class IMap(Greenlet): ...@@ -262,9 +274,8 @@ class IMap(Greenlet):
insort_right(self.waiting, (index, value)) insort_right(self.waiting, (index, value))
continue continue
self.index += 1 self.index += 1
if value is StopIteration and self.dead: if isinstance(value, Failure):
assert self.index - 1 == self.maxindex, self.__dict__ raise value.exc
raise StopIteration
if value is not _SKIP: if value is not _SKIP:
return value return value
...@@ -290,7 +301,19 @@ class IMap(Greenlet): ...@@ -290,7 +301,19 @@ class IMap(Greenlet):
self.queue.put((greenlet.index, _SKIP)) self.queue.put((greenlet.index, _SKIP))
if self.ready() and self.count <= 0: if self.ready() and self.count <= 0:
self.maxindex += 1 self.maxindex += 1
self.queue.put((self.maxindex, StopIteration)) self.queue.put((self.maxindex, Failure(StopIteration)))
def _on_finish(self, _self):
if not self.successful():
self.maxindex += 1
self.queue.put((self.maxindex, Failure(self.exception)))
class Failure(object):
__slots__ = ['exc']
def __init__(self, exc):
self.exc = exc
class Pool(Group): class Pool(Group):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment