Commit 97bfe8d3 authored by tzickel's avatar tzickel Committed by Antoine Pitrou

bpo-34172: multiprocessing.Pool leaks resources after being deleted (GH-8450)

Fix a reference issue inside multiprocessing.Pool that caused the pool to remain alive if it was deleted without being closed or terminated explicitly.
parent 9012a0fb
...@@ -149,8 +149,9 @@ class Pool(object): ...@@ -149,8 +149,9 @@ class Pool(object):
''' '''
_wrap_exception = True _wrap_exception = True
def Process(self, *args, **kwds): @staticmethod
return self._ctx.Process(*args, **kwds) def Process(ctx, *args, **kwds):
return ctx.Process(*args, **kwds)
def __init__(self, processes=None, initializer=None, initargs=(), def __init__(self, processes=None, initializer=None, initargs=(),
maxtasksperchild=None, context=None): maxtasksperchild=None, context=None):
...@@ -177,13 +178,15 @@ class Pool(object): ...@@ -177,13 +178,15 @@ class Pool(object):
self._worker_handler = threading.Thread( self._worker_handler = threading.Thread(
target=Pool._handle_workers, target=Pool._handle_workers,
args=(self, ) args=(self._cache, self._taskqueue, self._ctx, self.Process,
self._processes, self._pool, self._inqueue, self._outqueue,
self._initializer, self._initargs, self._maxtasksperchild,
self._wrap_exception)
) )
self._worker_handler.daemon = True self._worker_handler.daemon = True
self._worker_handler._state = RUN self._worker_handler._state = RUN
self._worker_handler.start() self._worker_handler.start()
self._task_handler = threading.Thread( self._task_handler = threading.Thread(
target=Pool._handle_tasks, target=Pool._handle_tasks,
args=(self._taskqueue, self._quick_put, self._outqueue, args=(self._taskqueue, self._quick_put, self._outqueue,
...@@ -209,43 +212,62 @@ class Pool(object): ...@@ -209,43 +212,62 @@ class Pool(object):
exitpriority=15 exitpriority=15
) )
def _join_exited_workers(self): @staticmethod
def _join_exited_workers(pool):
"""Cleanup after any worker processes which have exited due to reaching """Cleanup after any worker processes which have exited due to reaching
their specified lifetime. Returns True if any workers were cleaned up. their specified lifetime. Returns True if any workers were cleaned up.
""" """
cleaned = False cleaned = False
for i in reversed(range(len(self._pool))): for i in reversed(range(len(pool))):
worker = self._pool[i] worker = pool[i]
if worker.exitcode is not None: if worker.exitcode is not None:
# worker exited # worker exited
util.debug('cleaning up worker %d' % i) util.debug('cleaning up worker %d' % i)
worker.join() worker.join()
cleaned = True cleaned = True
del self._pool[i] del pool[i]
return cleaned return cleaned
def _repopulate_pool(self): def _repopulate_pool(self):
return self._repopulate_pool_static(self._ctx, self.Process,
self._processes,
self._pool, self._inqueue,
self._outqueue, self._initializer,
self._initargs,
self._maxtasksperchild,
self._wrap_exception)
@staticmethod
def _repopulate_pool_static(ctx, Process, processes, pool, inqueue,
outqueue, initializer, initargs,
maxtasksperchild, wrap_exception):
"""Bring the number of pool processes up to the specified number, """Bring the number of pool processes up to the specified number,
for use after reaping workers which have exited. for use after reaping workers which have exited.
""" """
for i in range(self._processes - len(self._pool)): for i in range(processes - len(pool)):
w = self.Process(target=worker, w = Process(ctx, target=worker,
args=(self._inqueue, self._outqueue, args=(inqueue, outqueue,
self._initializer, initializer,
self._initargs, self._maxtasksperchild, initargs, maxtasksperchild,
self._wrap_exception) wrap_exception)
) )
self._pool.append(w) pool.append(w)
w.name = w.name.replace('Process', 'PoolWorker') w.name = w.name.replace('Process', 'PoolWorker')
w.daemon = True w.daemon = True
w.start() w.start()
util.debug('added worker') util.debug('added worker')
def _maintain_pool(self): @staticmethod
def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue,
initializer, initargs, maxtasksperchild,
wrap_exception):
"""Clean up any exited workers and start replacements for them. """Clean up any exited workers and start replacements for them.
""" """
if self._join_exited_workers(): if Pool._join_exited_workers(pool):
self._repopulate_pool() Pool._repopulate_pool_static(ctx, Process, processes, pool,
inqueue, outqueue, initializer,
initargs, maxtasksperchild,
wrap_exception)
def _setup_queues(self): def _setup_queues(self):
self._inqueue = self._ctx.SimpleQueue() self._inqueue = self._ctx.SimpleQueue()
...@@ -403,16 +425,20 @@ class Pool(object): ...@@ -403,16 +425,20 @@ class Pool(object):
return result return result
@staticmethod @staticmethod
def _handle_workers(pool): def _handle_workers(cache, taskqueue, ctx, Process, processes, pool,
inqueue, outqueue, initializer, initargs,
maxtasksperchild, wrap_exception):
thread = threading.current_thread() thread = threading.current_thread()
# Keep maintaining workers until the cache gets drained, unless the pool # Keep maintaining workers until the cache gets drained, unless the pool
# is terminated. # is terminated.
while thread._state == RUN or (pool._cache and thread._state != TERMINATE): while thread._state == RUN or (cache and thread._state != TERMINATE):
pool._maintain_pool() Pool._maintain_pool(ctx, Process, processes, pool, inqueue,
outqueue, initializer, initargs,
maxtasksperchild, wrap_exception)
time.sleep(0.1) time.sleep(0.1)
# send sentinel to stop workers # send sentinel to stop workers
pool._taskqueue.put(None) taskqueue.put(None)
util.debug('worker handler exiting') util.debug('worker handler exiting')
@staticmethod @staticmethod
...@@ -794,7 +820,7 @@ class ThreadPool(Pool): ...@@ -794,7 +820,7 @@ class ThreadPool(Pool):
_wrap_exception = False _wrap_exception = False
@staticmethod @staticmethod
def Process(*args, **kwds): def Process(ctx, *args, **kwds):
from .dummy import Process from .dummy import Process
return Process(*args, **kwds) return Process(*args, **kwds)
......
...@@ -2549,6 +2549,12 @@ class _TestPool(BaseTestCase): ...@@ -2549,6 +2549,12 @@ class _TestPool(BaseTestCase):
# they were released too. # they were released too.
self.assertEqual(CountedObject.n_instances, 0) self.assertEqual(CountedObject.n_instances, 0)
def test_del_pool(self):
p = self.Pool(1)
wr = weakref.ref(p)
del p
gc.collect()
self.assertIsNone(wr())
def raising(): def raising():
raise KeyError("key") raise KeyError("key")
......
Fix a reference issue inside multiprocessing.Pool that caused the pool to remain alive if it was deleted without being closed or terminated explicitly.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment