Commit 334a7f4c authored by Richard Oudkerk's avatar Richard Oudkerk

Issue #19425 -- a pickling error should not cause pool to hang.

parent cbf22272
......@@ -147,7 +147,8 @@ class Pool(object):
self._task_handler = threading.Thread(
target=Pool._handle_tasks,
args=(self._taskqueue, self._quick_put, self._outqueue, self._pool)
args=(self._taskqueue, self._quick_put, self._outqueue,
self._pool, self._cache)
)
self._task_handler.daemon = True
self._task_handler._state = RUN
......@@ -338,7 +339,7 @@ class Pool(object):
debug('worker handler exiting')
@staticmethod
def _handle_tasks(taskqueue, put, outqueue, pool):
def _handle_tasks(taskqueue, put, outqueue, pool, cache):
thread = threading.current_thread()
for taskseq, set_length in iter(taskqueue.get, None):
......@@ -349,9 +350,12 @@ class Pool(object):
break
try:
put(task)
except IOError:
debug('could not put task on queue')
break
except Exception as e:
job, ind = task[:2]
try:
cache[job]._set(ind, (False, e))
except KeyError:
pass
else:
if set_length:
debug('doing set_length()')
......
......@@ -1691,6 +1691,16 @@ class _TestPool(BaseTestCase):
self.assertEqual(2, len(call_args))
self.assertIsInstance(call_args[1], ValueError)
def test_map_unplicklable(self):
# Issue #19425 -- failure to pickle should not cause a hang
if self.TYPE == 'threads':
return
class A(object):
def __reduce__(self):
raise RuntimeError('cannot pickle')
with self.assertRaises(RuntimeError):
self.pool.map(sqr, [A()]*10)
def test_map_chunksize(self):
try:
self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment