Commit 6b973747 authored by Andrew Svetlov's avatar Andrew Svetlov

Issue #16284: Prevent keeping unnecessary references to worker functions in...

Issue #16284: Prevent keeping unnecessary references to worker functions in concurrent.futures ThreadPoolExecutor.
parent 0f77bf27
......@@ -240,6 +240,8 @@ def _queue_management_worker(executor_reference,
"terminated abruptly while the future was "
"running or pending."
))
# Delete references to object. See issue16284
del work_item
pending_work_items.clear()
# Terminate remaining workers forcibly: the queues or their
# locks may be in a dirty state and block forever.
......@@ -264,6 +266,8 @@ def _queue_management_worker(executor_reference,
work_item.future.set_exception(result_item.exception)
else:
work_item.future.set_result(result_item.result)
# Delete references to object. See issue16284
del work_item
# Check whether we should start shutting down.
executor = executor_reference()
# No more work items can be added if:
......
......@@ -63,6 +63,8 @@ def _worker(executor_reference, work_queue):
work_item = work_queue.get(block=True)
if work_item is not None:
work_item.run()
# Delete references to object. See issue16284
del work_item
continue
executor = executor_reference()
# Exit if:
......
......@@ -243,10 +243,14 @@ class Queue(object):
if wacquire is None:
send(obj)
# Delete references to object. See issue16284
del obj
else:
wacquire()
try:
send(obj)
# Delete references to object. See issue16284
del obj
finally:
wrelease()
except IndexError:
......
......@@ -15,6 +15,7 @@ import sys
import threading
import time
import unittest
import weakref
from concurrent import futures
from concurrent.futures._base import (
......@@ -52,6 +53,11 @@ def sleep_and_print(t, msg):
sys.stdout.flush()
class MyObject(object):
def my_method(self):
pass
class ExecutorMixin:
worker_count = 5
......@@ -396,6 +402,22 @@ class ExecutorTest(unittest.TestCase):
self.executor.map(str, [2] * (self.worker_count + 1))
self.executor.shutdown()
@test.support.cpython_only
def test_no_stale_references(self):
# Issue #16284: check that the executors don't unnecessarily hang onto
# references.
my_object = MyObject()
my_object_collected = threading.Event()
my_object_callback = weakref.ref(
my_object, lambda obj: my_object_collected.set())
# Deliberately discarding the future.
self.executor.submit(my_object.my_method)
del my_object
collected = my_object_collected.wait(timeout=5.0)
self.assertTrue(collected,
"Stale reference not collected within timeout.")
class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):
def test_map_submits_without_iteration(self):
......
......@@ -726,6 +726,7 @@ Lukas Lueg
Loren Luke
Fredrik Lundh
Mark Lutz
Taras Lyapun
Jim Lynch
Mikael Lyngvig
Martin von Löwis
......
......@@ -88,6 +88,9 @@ Core and Builtins
Library
-------
- Issue #16284: Prevent keeping unnecessary references to worker functions
in concurrent.futures ThreadPoolExecutor.
- Issue #1207589: Add Cut/Copy/Paste items to IDLE right click Context Menu
Patch by Todd Rovito.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment