Commit 9dfc754d authored by Victor Stinner's avatar Victor Stinner Committed by GitHub

Revert "bpo-34172: multiprocessing.Pool leaks resources after being deleted (GH-8450)" (GH-10971)

This reverts commit 97bfe8d3.
parent 8752dfbd
......@@ -149,9 +149,8 @@ class Pool(object):
'''
_wrap_exception = True
@staticmethod
def Process(ctx, *args, **kwds):
return ctx.Process(*args, **kwds)
def Process(self, *args, **kwds):
return self._ctx.Process(*args, **kwds)
def __init__(self, processes=None, initializer=None, initargs=(),
maxtasksperchild=None, context=None):
......@@ -186,15 +185,13 @@ class Pool(object):
self._worker_handler = threading.Thread(
target=Pool._handle_workers,
args=(self._cache, self._taskqueue, self._ctx, self.Process,
self._processes, self._pool, self._inqueue, self._outqueue,
self._initializer, self._initargs, self._maxtasksperchild,
self._wrap_exception)
args=(self, )
)
self._worker_handler.daemon = True
self._worker_handler._state = RUN
self._worker_handler.start()
self._task_handler = threading.Thread(
target=Pool._handle_tasks,
args=(self._taskqueue, self._quick_put, self._outqueue,
......@@ -220,62 +217,43 @@ class Pool(object):
exitpriority=15
)
@staticmethod
def _join_exited_workers(pool):
def _join_exited_workers(self):
"""Cleanup after any worker processes which have exited due to reaching
their specified lifetime. Returns True if any workers were cleaned up.
"""
cleaned = False
for i in reversed(range(len(pool))):
worker = pool[i]
for i in reversed(range(len(self._pool))):
worker = self._pool[i]
if worker.exitcode is not None:
# worker exited
util.debug('cleaning up worker %d' % i)
worker.join()
cleaned = True
del pool[i]
del self._pool[i]
return cleaned
def _repopulate_pool(self):
return self._repopulate_pool_static(self._ctx, self.Process,
self._processes,
self._pool, self._inqueue,
self._outqueue, self._initializer,
self._initargs,
self._maxtasksperchild,
self._wrap_exception)
@staticmethod
def _repopulate_pool_static(ctx, Process, processes, pool, inqueue,
outqueue, initializer, initargs,
maxtasksperchild, wrap_exception):
"""Bring the number of pool processes up to the specified number,
for use after reaping workers which have exited.
"""
for i in range(processes - len(pool)):
w = Process(ctx, target=worker,
args=(inqueue, outqueue,
initializer,
initargs, maxtasksperchild,
wrap_exception)
)
for i in range(self._processes - len(self._pool)):
w = self.Process(target=worker,
args=(self._inqueue, self._outqueue,
self._initializer,
self._initargs, self._maxtasksperchild,
self._wrap_exception)
)
w.name = w.name.replace('Process', 'PoolWorker')
w.daemon = True
w.start()
pool.append(w)
self._pool.append(w)
util.debug('added worker')
@staticmethod
def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue,
initializer, initargs, maxtasksperchild,
wrap_exception):
def _maintain_pool(self):
"""Clean up any exited workers and start replacements for them.
"""
if Pool._join_exited_workers(pool):
Pool._repopulate_pool_static(ctx, Process, processes, pool,
inqueue, outqueue, initializer,
initargs, maxtasksperchild,
wrap_exception)
if self._join_exited_workers():
self._repopulate_pool()
def _setup_queues(self):
self._inqueue = self._ctx.SimpleQueue()
......@@ -433,20 +411,16 @@ class Pool(object):
return result
@staticmethod
def _handle_workers(cache, taskqueue, ctx, Process, processes, pool,
inqueue, outqueue, initializer, initargs,
maxtasksperchild, wrap_exception):
def _handle_workers(pool):
thread = threading.current_thread()
# Keep maintaining workers until the cache gets drained, unless the pool
# is terminated.
while thread._state == RUN or (cache and thread._state != TERMINATE):
Pool._maintain_pool(ctx, Process, processes, pool, inqueue,
outqueue, initializer, initargs,
maxtasksperchild, wrap_exception)
while thread._state == RUN or (pool._cache and thread._state != TERMINATE):
pool._maintain_pool()
time.sleep(0.1)
# send sentinel to stop workers
taskqueue.put(None)
pool._taskqueue.put(None)
util.debug('worker handler exiting')
@staticmethod
......@@ -828,7 +802,7 @@ class ThreadPool(Pool):
_wrap_exception = False
@staticmethod
def Process(ctx, *args, **kwds):
def Process(*args, **kwds):
from .dummy import Process
return Process(*args, **kwds)
......
......@@ -2558,13 +2558,6 @@ class _TestPool(BaseTestCase):
# they were released too.
self.assertEqual(CountedObject.n_instances, 0)
@support.reap_threads
def test_del_pool(self):
p = self.Pool(1)
wr = weakref.ref(p)
del p
gc.collect()
self.assertIsNone(wr())
def raising():
raise KeyError("key")
......
Fix a reference issue inside multiprocessing.Pool that caused the pool to remain alive if it was deleted without being closed or terminated explicitly.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment