Issue #10332: multiprocessing: fix a race condition when a Pool is closed

before all tasks have completed.
parent d6ca6c2b
......@@ -321,7 +321,11 @@ class Pool(object):
@staticmethod
def _handle_workers(pool):
while pool._worker_handler._state == RUN and pool._state == RUN:
thread = threading.current_thread()
# Keep maintaining workers until the cache gets drained, unless the pool
# is terminated.
while thread._state == RUN or (pool._cache and thread._state != TERMINATE):
pool._maintain_pool()
time.sleep(0.1)
# send sentinel to stop workers
......
......@@ -1217,6 +1217,20 @@ class _TestPoolWorkerLifetime(BaseTestCase):
p.close()
p.join()
def test_pool_worker_lifetime_early_close(self):
# Issue #10332: closing a pool whose workers have limited lifetimes
# before all the tasks completed would make join() hang.
p = multiprocessing.Pool(3, maxtasksperchild=1)
results = []
for i in range(6):
results.append(p.apply_async(sqr, (i, 0.3)))
p.close()
p.join()
# check the results
for (j, res) in enumerate(results):
self.assertEqual(res.get(), sqr(j))
#
# Test that manager has expected number of shared objects left
#
......
......@@ -58,6 +58,9 @@ Core and Builtins
Library
-------
- Issue #10332: multiprocessing: fix a race condition when a Pool is closed
before all tasks have completed.
- Issue #13255: wrong docstrings in array module.
- Issue #9168: now smtpd is able to bind privileged port.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment