Commit e1a44f70 authored by Serhiy Storchaka's avatar Serhiy Storchaka

Issue #23051: multiprocessing.Pool methods imap() and imap_unordered() now

handle exceptions raised by an iterator.  Patch by Alon Diamant and Davin
Potts.
parent 798736e3
...@@ -334,25 +334,34 @@ class Pool(object): ...@@ -334,25 +334,34 @@ class Pool(object):
thread = threading.current_thread() thread = threading.current_thread()
for taskseq, set_length in iter(taskqueue.get, None): for taskseq, set_length in iter(taskqueue.get, None):
task = None
i = -1 i = -1
for i, task in enumerate(taskseq): try:
if thread._state: for i, task in enumerate(taskseq):
debug('task handler found thread._state != RUN') if thread._state:
break debug('task handler found thread._state != RUN')
try: break
put(task)
except Exception as e:
job, ind = task[:2]
try: try:
cache[job]._set(ind, (False, e)) put(task)
except KeyError: except Exception as e:
pass job, ind = task[:2]
else: try:
cache[job]._set(ind, (False, e))
except KeyError:
pass
else:
if set_length:
debug('doing set_length()')
set_length(i+1)
continue
break
except Exception as ex:
job, ind = task[:2] if task else (0, 0)
if job in cache:
cache[job]._set(ind + 1, (False, ex))
if set_length: if set_length:
debug('doing set_length()') debug('doing set_length()')
set_length(i+1) set_length(i+1)
continue
break
else: else:
debug('task handler got sentinel') debug('task handler got sentinel')
......
...@@ -1122,6 +1122,15 @@ class _TestContainers(BaseTestCase): ...@@ -1122,6 +1122,15 @@ class _TestContainers(BaseTestCase):
def sqr(x, wait=0.0): def sqr(x, wait=0.0):
time.sleep(wait) time.sleep(wait)
return x*x return x*x
class SayWhenError(ValueError): pass
def exception_throwing_generator(total, when):
for i in range(total):
if i == when:
raise SayWhenError("Somebody said when")
yield i
class _TestPool(BaseTestCase): class _TestPool(BaseTestCase):
def test_apply(self): def test_apply(self):
...@@ -1177,6 +1186,25 @@ class _TestPool(BaseTestCase): ...@@ -1177,6 +1186,25 @@ class _TestPool(BaseTestCase):
self.assertEqual(it.next(), i*i) self.assertEqual(it.next(), i*i)
self.assertRaises(StopIteration, it.next) self.assertRaises(StopIteration, it.next)
def test_imap_handle_iterable_exception(self):
if self.TYPE == 'manager':
self.skipTest('test not appropriate for {}'.format(self.TYPE))
it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
for i in range(3):
self.assertEqual(next(it), i*i)
self.assertRaises(SayWhenError, it.next)
# SayWhenError seen at start of problematic chunk's results
it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
for i in range(6):
self.assertEqual(next(it), i*i)
self.assertRaises(SayWhenError, it.next)
it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
for i in range(4):
self.assertEqual(next(it), i*i)
self.assertRaises(SayWhenError, it.next)
def test_imap_unordered(self): def test_imap_unordered(self):
it = self.pool.imap_unordered(sqr, range(1000)) it = self.pool.imap_unordered(sqr, range(1000))
self.assertEqual(sorted(it), map(sqr, range(1000))) self.assertEqual(sorted(it), map(sqr, range(1000)))
...@@ -1184,6 +1212,25 @@ class _TestPool(BaseTestCase): ...@@ -1184,6 +1212,25 @@ class _TestPool(BaseTestCase):
it = self.pool.imap_unordered(sqr, range(1000), chunksize=53) it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
self.assertEqual(sorted(it), map(sqr, range(1000))) self.assertEqual(sorted(it), map(sqr, range(1000)))
def test_imap_unordered_handle_iterable_exception(self):
if self.TYPE == 'manager':
self.skipTest('test not appropriate for {}'.format(self.TYPE))
it = self.pool.imap_unordered(sqr,
exception_throwing_generator(10, 3),
1)
with self.assertRaises(SayWhenError):
# imap_unordered makes it difficult to anticipate the SayWhenError
for i in range(10):
self.assertEqual(next(it), i*i)
it = self.pool.imap_unordered(sqr,
exception_throwing_generator(20, 7),
2)
with self.assertRaises(SayWhenError):
for i in range(20):
self.assertEqual(next(it), i*i)
def test_make_pool(self): def test_make_pool(self):
self.assertRaises(ValueError, multiprocessing.Pool, -1) self.assertRaises(ValueError, multiprocessing.Pool, -1)
self.assertRaises(ValueError, multiprocessing.Pool, 0) self.assertRaises(ValueError, multiprocessing.Pool, 0)
......
...@@ -325,6 +325,7 @@ Raghuram Devarakonda ...@@ -325,6 +325,7 @@ Raghuram Devarakonda
Caleb Deveraux Caleb Deveraux
Catherine Devlin Catherine Devlin
Scott Dial Scott Dial
Alon Diamant
Toby Dickenson Toby Dickenson
Mark Dickinson Mark Dickinson
Jack Diederich Jack Diederich
......
...@@ -21,6 +21,10 @@ Core and Builtins ...@@ -21,6 +21,10 @@ Core and Builtins
Library Library
------- -------
- Issue #23051: multiprocessing.Pool methods imap() and imap_unordered() now
handle exceptions raised by an iterator. Patch by Alon Diamant and Davin
Potts.
- Issue #22928: Disabled HTTP header injections in httplib. - Issue #22928: Disabled HTTP header injections in httplib.
Original patch by Demian Brecht. Original patch by Demian Brecht.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment