Commit 5084ff7d authored by Antoine Pitrou's avatar Antoine Pitrou Committed by GitHub

bpo-29861: release references to multiprocessing Pool tasks (#743) (#803)

* bpo-29861: release references to multiprocessing Pool tasks (#743)

* bpo-29861: release references to multiprocessing Pool tasks

Release references to tasks, their arguments and their results as soon
as they are finished, instead of keeping them alive until another task
arrives.

* Comments in test

(cherry picked from commit 8988945c)

* Fix Misc/NEWS ?
parent de65804d
......@@ -120,6 +120,8 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
debug("Possible encoding error while sending result: %s" % (
wrapped))
put((job, i, (False, wrapped)))
task = job = result = func = args = kwds = None
completed += 1
debug('worker exiting after %d tasks' % completed)
......@@ -362,10 +364,11 @@ class Pool(object):
if set_length:
debug('doing set_length()')
set_length(i+1)
finally:
task = taskseq = job = None
else:
debug('task handler got sentinel')
try:
# tell result handler to finish when cache is empty
debug('task handler sending sentinel to result handler')
......@@ -405,6 +408,7 @@ class Pool(object):
cache[job]._set(i, obj)
except KeyError:
pass
task = job = obj = None
while cache and thread._state != TERMINATE:
try:
......@@ -421,6 +425,7 @@ class Pool(object):
cache[job]._set(i, obj)
except KeyError:
pass
task = job = obj = None
if hasattr(outqueue, '_reader'):
debug('ensuring that outqueue is not full')
......
......@@ -14,6 +14,7 @@ import socket
import random
import logging
import errno
import weakref
import test.script_helper
from test import test_support
from StringIO import StringIO
......@@ -1123,6 +1124,19 @@ def sqr(x, wait=0.0):
time.sleep(wait)
return x*x
def identity(x):
return x
class CountedObject(object):
n_instances = 0
def __new__(cls):
cls.n_instances += 1
return object.__new__(cls)
def __del__(self):
type(self).n_instances -= 1
class SayWhenError(ValueError): pass
def exception_throwing_generator(total, when):
......@@ -1268,6 +1282,20 @@ class _TestPool(BaseTestCase):
p.close()
p.join()
def test_release_task_refs(self):
# Issue #29861: task arguments and results should not be kept
# alive after we are done with them.
objs = list(CountedObject() for i in range(10))
refs = list(weakref.ref(o) for o in objs)
self.pool.map(identity, objs)
del objs
self.assertEqual(set(wr() for wr in refs), {None})
# With a process pool, copies of the objects are returned, check
# they were released too.
self.assertEqual(CountedObject.n_instances, 0)
def unpickleable_result():
return lambda: 42
......
......@@ -39,6 +39,9 @@ Extension Modules
Library
-------
- bpo-29861: Release references to tasks, their arguments and their results
as soon as they are finished in multiprocessing.Pool.
- bpo-27880: Fixed integer overflow in cPickle when pickle large strings or
too many objects.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment