Commit 0c200c28 authored by Richard Oudkerk's avatar Richard Oudkerk

Issue #9400: Partial backport of fix for #9244

In multiprocessing, a pool worker process would die
if the result/error could not be pickled.  This could
cause pool methods to hang.

In 3.x this was fixed by 0aa8af79359d (which also added
an error_callback argument to some methods), but the fix
was not back ported.
parent a9e18cdd
...@@ -68,6 +68,23 @@ def mapstar(args): ...@@ -68,6 +68,23 @@ def mapstar(args):
# Code run by worker processes # Code run by worker processes
# #
class MaybeEncodingError(Exception):
"""Wraps possible unpickleable errors, so they can be
safely sent through the socket."""
def __init__(self, exc, value):
self.exc = repr(exc)
self.value = repr(value)
super(MaybeEncodingError, self).__init__(self.exc, self.value)
def __str__(self):
return "Error sending result: '%s'. Reason: '%s'" % (self.value,
self.exc)
def __repr__(self):
return "<MaybeEncodingError: %s>" % str(self)
def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None): def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0) assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
put = outqueue.put put = outqueue.put
...@@ -96,7 +113,13 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None): ...@@ -96,7 +113,13 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
result = (True, func(*args, **kwds)) result = (True, func(*args, **kwds))
except Exception, e: except Exception, e:
result = (False, e) result = (False, e)
try:
put((job, i, result)) put((job, i, result))
except Exception as e:
wrapped = MaybeEncodingError(e, result[1])
debug("Possible encoding error while sending result: %s" % (
wrapped))
put((job, i, (False, wrapped)))
completed += 1 completed += 1
debug('worker exiting after %d tasks' % completed) debug('worker exiting after %d tasks' % completed)
......
...@@ -1152,6 +1152,24 @@ class _TestPool(BaseTestCase): ...@@ -1152,6 +1152,24 @@ class _TestPool(BaseTestCase):
join() join()
self.assertTrue(join.elapsed < 0.2) self.assertTrue(join.elapsed < 0.2)
def unpickleable_result():
return lambda: 42
class _TestPoolWorkerErrors(BaseTestCase):
ALLOWED_TYPES = ('processes', )
def test_unpickleable_result(self):
from multiprocessing.pool import MaybeEncodingError
p = multiprocessing.Pool(2)
# Make sure we don't lose pool processes because of encoding errors.
for iteration in range(20):
res = p.apply_async(unpickleable_result)
self.assertRaises(MaybeEncodingError, res.get)
p.close()
p.join()
class _TestPoolWorkerLifetime(BaseTestCase): class _TestPoolWorkerLifetime(BaseTestCase):
ALLOWED_TYPES = ('processes', ) ALLOWED_TYPES = ('processes', )
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment