Commit dee1219a authored by Denis Bilenko's avatar Denis Bilenko

Merge pull request #425 from surfly/imap

fix #423: pool: LoopExit in imap/imap_unordered
parents f5790983 e3ef3c8e
...@@ -4,6 +4,12 @@ Changelog ...@@ -4,6 +4,12 @@ Changelog
.. currentmodule:: gevent .. currentmodule:: gevent
Release 1.0.1
-------------
- Fix #423: Pool's imap/imap_unordered could hang forever. Based on patch and test by Jianfei Wang.
Release 1.0 (Nov 26, 2013) Release 1.0 (Nov 26, 2013)
-------------------------- --------------------------
......
...@@ -208,6 +208,7 @@ class IMapUnordered(Greenlet): ...@@ -208,6 +208,7 @@ class IMapUnordered(Greenlet):
self.iterable = iterable self.iterable = iterable
self.queue = Queue() self.queue = Queue()
self.count = 0 self.count = 0
self.finished = False
self.rawlink(self._on_finish) self.rawlink(self._on_finish)
def __iter__(self): def __iter__(self):
...@@ -226,13 +227,9 @@ class IMapUnordered(Greenlet): ...@@ -226,13 +227,9 @@ class IMapUnordered(Greenlet):
def _run(self): def _run(self):
try: try:
func = self.func func = self.func
empty = True
for item in self.iterable: for item in self.iterable:
self.count += 1 self.count += 1
self.spawn(func, item).rawlink(self._on_result) self.spawn(func, item).rawlink(self._on_result)
empty = False
if empty:
self.queue.put(Failure(StopIteration))
finally: finally:
self.__dict__.pop('spawn', None) self.__dict__.pop('spawn', None)
self.__dict__.pop('func', None) self.__dict__.pop('func', None)
...@@ -244,12 +241,20 @@ class IMapUnordered(Greenlet): ...@@ -244,12 +241,20 @@ class IMapUnordered(Greenlet):
self.queue.put(greenlet.value) self.queue.put(greenlet.value)
else: else:
self.queue.put(Failure(greenlet.exception)) self.queue.put(Failure(greenlet.exception))
if self.ready() and self.count <= 0: if self.ready() and self.count <= 0 and not self.finished:
self.queue.put(Failure(StopIteration)) self.queue.put(Failure(StopIteration))
self.finished = True
def _on_finish(self, _self): def _on_finish(self, _self):
if self.finished:
return
if not self.successful(): if not self.successful():
self.queue.put(Failure(self.exception)) self.queue.put(Failure(self.exception))
self.finished = True
return
if self.count <= 0:
self.queue.put(Failure(StopIteration))
self.finished = True
class IMap(Greenlet): class IMap(Greenlet):
...@@ -266,6 +271,7 @@ class IMap(Greenlet): ...@@ -266,6 +271,7 @@ class IMap(Greenlet):
self.waiting = [] # QQQ maybe deque will work faster there? self.waiting = [] # QQQ maybe deque will work faster there?
self.index = 0 self.index = 0
self.maxindex = -1 self.maxindex = -1
self.finished = False
self.rawlink(self._on_finish) self.rawlink(self._on_finish)
def __iter__(self): def __iter__(self):
...@@ -291,7 +297,6 @@ class IMap(Greenlet): ...@@ -291,7 +297,6 @@ class IMap(Greenlet):
def _run(self): def _run(self):
try: try:
empty = True
func = self.func func = self.func
for item in self.iterable: for item in self.iterable:
self.count += 1 self.count += 1
...@@ -299,10 +304,6 @@ class IMap(Greenlet): ...@@ -299,10 +304,6 @@ class IMap(Greenlet):
g.rawlink(self._on_result) g.rawlink(self._on_result)
self.maxindex += 1 self.maxindex += 1
g.index = self.maxindex g.index = self.maxindex
empty = False
if empty:
self.maxindex += 1
self.queue.put((self.maxindex, Failure(StopIteration)))
finally: finally:
self.__dict__.pop('spawn', None) self.__dict__.pop('spawn', None)
self.__dict__.pop('func', None) self.__dict__.pop('func', None)
...@@ -314,14 +315,23 @@ class IMap(Greenlet): ...@@ -314,14 +315,23 @@ class IMap(Greenlet):
self.queue.put((greenlet.index, greenlet.value)) self.queue.put((greenlet.index, greenlet.value))
else: else:
self.queue.put((greenlet.index, Failure(greenlet.exception))) self.queue.put((greenlet.index, Failure(greenlet.exception)))
if self.ready() and self.count <= 0: if self.ready() and self.count <= 0 and not self.finished:
self.maxindex += 1 self.maxindex += 1
self.queue.put((self.maxindex, Failure(StopIteration))) self.queue.put((self.maxindex, Failure(StopIteration)))
self.finished = True
def _on_finish(self, _self): def _on_finish(self, _self):
if self.finished:
return
if not self.successful(): if not self.successful():
self.maxindex += 1 self.maxindex += 1
self.queue.put((self.maxindex, Failure(self.exception))) self.queue.put((self.maxindex, Failure(self.exception)))
self.finished = True
return
if self.count <= 0:
self.maxindex += 1
self.queue.put((self.maxindex, Failure(StopIteration)))
self.finished = True
class Failure(object): class Failure(object):
......
...@@ -2,6 +2,7 @@ from time import time ...@@ -2,6 +2,7 @@ from time import time
import gevent import gevent
from gevent import pool from gevent import pool
from gevent.event import Event from gevent.event import Event
from gevent.queue import Queue
import greentest import greentest
import random import random
from greentest import ExpectedException from greentest import ExpectedException
...@@ -225,6 +226,12 @@ def sqr_random_sleep(x): ...@@ -225,6 +226,12 @@ def sqr_random_sleep(x):
return x * x return x * x
def final_sleep():
for i in range(3):
yield i
gevent.sleep(0.2)
TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.082, 0.035, 0.14 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.082, 0.035, 0.14
...@@ -330,6 +337,30 @@ class TestPool(greentest.TestCase): ...@@ -330,6 +337,30 @@ class TestPool(greentest.TestCase):
expected = ['1', '2', '10'] expected = ['1', '2', '10']
self.assertEqual(result, expected) self.assertEqual(result, expected)
# https://github.com/surfly/gevent/issues/423
def test_imap_no_stop(self):
q = Queue()
q.put(123)
gevent.spawn_later(0.1, q.put, StopIteration)
result = list(self.pool.imap(lambda _: _, q))
self.assertEqual(result, [123])
def test_imap_unordered_no_stop(self):
q = Queue()
q.put(1234)
gevent.spawn_later(0.1, q.put, StopIteration)
result = list(self.pool.imap_unordered(lambda _: _, q))
self.assertEqual(result, [1234])
# same issue, but different test: https://github.com/surfly/gevent/issues/311
def test_imap_final_sleep(self):
result = list(self.pool.imap(sqr, final_sleep()))
self.assertEqual(result, [0, 1, 4])
def test_imap_unordered_final_sleep(self):
result = list(self.pool.imap_unordered(sqr, final_sleep()))
self.assertEqual(result, [0, 1, 4])
class TestPool2(TestPool): class TestPool2(TestPool):
size = 2 size = 2
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment