Commit 08ce6080 authored by Antoine Pitrou's avatar Antoine Pitrou

Issue #11635: Don't use polling in worker threads and processes launched by

concurrent.futures.
parents 0a01b9f9 c13d454e
...@@ -66,14 +66,17 @@ import weakref ...@@ -66,14 +66,17 @@ import weakref
# workers to exit when their work queues are empty and then waits until the # workers to exit when their work queues are empty and then waits until the
# threads/processes finish. # threads/processes finish.
_live_threads = weakref.WeakSet() _threads_queues = weakref.WeakKeyDictionary()
_shutdown = False _shutdown = False
def _python_exit(): def _python_exit():
global _shutdown global _shutdown
_shutdown = True _shutdown = True
for thread in _live_threads: items = list(_threads_queues.items())
thread.join() for t, q in items:
q.put(None)
for t, q in items:
t.join()
# Controls how many more calls than processes will be queued in the call queue. # Controls how many more calls than processes will be queued in the call queue.
# A smaller number will mean that processes spend more time idle waiting for # A smaller number will mean that processes spend more time idle waiting for
...@@ -116,11 +119,15 @@ def _process_worker(call_queue, result_queue, shutdown): ...@@ -116,11 +119,15 @@ def _process_worker(call_queue, result_queue, shutdown):
""" """
while True: while True:
try: try:
call_item = call_queue.get(block=True, timeout=0.1) call_item = call_queue.get(block=True)
except queue.Empty: except queue.Empty:
if shutdown.is_set(): if shutdown.is_set():
return return
else: else:
if call_item is None:
# Wake up queue management thread
result_queue.put(None)
return
try: try:
r = call_item.fn(*call_item.args, **call_item.kwargs) r = call_item.fn(*call_item.args, **call_item.kwargs)
except BaseException as e: except BaseException as e:
...@@ -195,40 +202,56 @@ def _queue_manangement_worker(executor_reference, ...@@ -195,40 +202,56 @@ def _queue_manangement_worker(executor_reference,
process workers that they should exit when their work queue is process workers that they should exit when their work queue is
empty. empty.
""" """
nb_shutdown_processes = 0
def shutdown_one_process():
"""Tell a worker to terminate, which will in turn wake us again"""
nonlocal nb_shutdown_processes
call_queue.put(None)
nb_shutdown_processes += 1
while True: while True:
_add_call_item_to_queue(pending_work_items, _add_call_item_to_queue(pending_work_items,
work_ids_queue, work_ids_queue,
call_queue) call_queue)
try: try:
result_item = result_queue.get(block=True, timeout=0.1) result_item = result_queue.get(block=True)
except queue.Empty: except queue.Empty:
executor = executor_reference() pass
# No more work items can be added if:
# - The interpreter is shutting down OR
# - The executor that owns this worker has been collected OR
# - The executor that owns this worker has been shutdown.
if _shutdown or executor is None or executor._shutdown_thread:
# Since no new work items can be added, it is safe to shutdown
# this thread if there are no pending work items.
if not pending_work_items:
shutdown_process_event.set()
# If .join() is not called on the created processes then
# some multiprocessing.Queue methods may deadlock on Mac OS
# X.
for p in processes:
p.join()
return
del executor
else: else:
work_item = pending_work_items[result_item.work_id] if result_item is not None:
del pending_work_items[result_item.work_id] work_item = pending_work_items[result_item.work_id]
del pending_work_items[result_item.work_id]
if result_item.exception:
work_item.future.set_exception(result_item.exception) if result_item.exception:
work_item.future.set_exception(result_item.exception)
else:
work_item.future.set_result(result_item.result)
continue
# If we come here, we either got a timeout or were explicitly woken up.
# In either case, check whether we should start shutting down.
executor = executor_reference()
# No more work items can be added if:
# - The interpreter is shutting down OR
# - The executor that owns this worker has been collected OR
# - The executor that owns this worker has been shutdown.
if _shutdown or executor is None or executor._shutdown_thread:
# Since no new work items can be added, it is safe to shutdown
# this thread if there are no pending work items.
if not pending_work_items:
shutdown_process_event.set()
while nb_shutdown_processes < len(processes):
shutdown_one_process()
# If .join() is not called on the created processes then
# some multiprocessing.Queue methods may deadlock on Mac OS
# X.
for p in processes:
p.join()
return
else: else:
work_item.future.set_result(result_item.result) # Start shutting down by telling a process it can exit.
shutdown_one_process()
del executor
_system_limits_checked = False _system_limits_checked = False
_system_limited = None _system_limited = None
...@@ -289,10 +312,14 @@ class ProcessPoolExecutor(_base.Executor): ...@@ -289,10 +312,14 @@ class ProcessPoolExecutor(_base.Executor):
self._pending_work_items = {} self._pending_work_items = {}
def _start_queue_management_thread(self): def _start_queue_management_thread(self):
# When the executor gets lost, the weakref callback will wake up
# the queue management thread.
def weakref_cb(_, q=self._result_queue):
q.put(None)
if self._queue_management_thread is None: if self._queue_management_thread is None:
self._queue_management_thread = threading.Thread( self._queue_management_thread = threading.Thread(
target=_queue_manangement_worker, target=_queue_manangement_worker,
args=(weakref.ref(self), args=(weakref.ref(self, weakref_cb),
self._processes, self._processes,
self._pending_work_items, self._pending_work_items,
self._work_ids, self._work_ids,
...@@ -301,7 +328,7 @@ class ProcessPoolExecutor(_base.Executor): ...@@ -301,7 +328,7 @@ class ProcessPoolExecutor(_base.Executor):
self._shutdown_process_event)) self._shutdown_process_event))
self._queue_management_thread.daemon = True self._queue_management_thread.daemon = True
self._queue_management_thread.start() self._queue_management_thread.start()
_live_threads.add(self._queue_management_thread) _threads_queues[self._queue_management_thread] = self._result_queue
def _adjust_process_count(self): def _adjust_process_count(self):
for _ in range(len(self._processes), self._max_workers): for _ in range(len(self._processes), self._max_workers):
...@@ -324,6 +351,8 @@ class ProcessPoolExecutor(_base.Executor): ...@@ -324,6 +351,8 @@ class ProcessPoolExecutor(_base.Executor):
self._pending_work_items[self._queue_count] = w self._pending_work_items[self._queue_count] = w
self._work_ids.put(self._queue_count) self._work_ids.put(self._queue_count)
self._queue_count += 1 self._queue_count += 1
# Wake up queue management thread
self._result_queue.put(None)
self._start_queue_management_thread() self._start_queue_management_thread()
self._adjust_process_count() self._adjust_process_count()
...@@ -333,8 +362,10 @@ class ProcessPoolExecutor(_base.Executor): ...@@ -333,8 +362,10 @@ class ProcessPoolExecutor(_base.Executor):
def shutdown(self, wait=True): def shutdown(self, wait=True):
with self._shutdown_lock: with self._shutdown_lock:
self._shutdown_thread = True self._shutdown_thread = True
if wait: if self._queue_management_thread:
if self._queue_management_thread: # Wake up queue management thread
self._result_queue.put(None)
if wait:
self._queue_management_thread.join() self._queue_management_thread.join()
# To reduce the risk of openning too many files, remove references to # To reduce the risk of openning too many files, remove references to
# objects that use file descriptors. # objects that use file descriptors.
......
...@@ -25,14 +25,18 @@ import weakref ...@@ -25,14 +25,18 @@ import weakref
# workers to exit when their work queues are empty and then waits until the # workers to exit when their work queues are empty and then waits until the
# threads finish. # threads finish.
_live_threads = weakref.WeakSet() _threads_queues = weakref.WeakKeyDictionary()
_shutdown = False _shutdown = False
def _python_exit(): def _python_exit():
global _shutdown global _shutdown
_shutdown = True _shutdown = True
for thread in _live_threads: items = list(_threads_queues.items())
thread.join() for t, q in items:
q.put(None)
for t, q in items:
t.join()
atexit.register(_python_exit) atexit.register(_python_exit)
class _WorkItem(object): class _WorkItem(object):
...@@ -57,18 +61,23 @@ def _worker(executor_reference, work_queue): ...@@ -57,18 +61,23 @@ def _worker(executor_reference, work_queue):
try: try:
while True: while True:
try: try:
work_item = work_queue.get(block=True, timeout=0.1) work_item = work_queue.get(block=True)
except queue.Empty: except queue.Empty:
executor = executor_reference() pass
# Exit if:
# - The interpreter is shutting down OR
# - The executor that owns the worker has been collected OR
# - The executor that owns the worker has been shutdown.
if _shutdown or executor is None or executor._shutdown:
return
del executor
else: else:
work_item.run() if work_item is not None:
work_item.run()
continue
executor = executor_reference()
# Exit if:
# - The interpreter is shutting down OR
# - The executor that owns the worker has been collected OR
# - The executor that owns the worker has been shutdown.
if _shutdown or executor is None or executor._shutdown:
# Notice other workers
work_queue.put(None)
return
del executor
except BaseException as e: except BaseException as e:
_base.LOGGER.critical('Exception in worker', exc_info=True) _base.LOGGER.critical('Exception in worker', exc_info=True)
...@@ -100,19 +109,25 @@ class ThreadPoolExecutor(_base.Executor): ...@@ -100,19 +109,25 @@ class ThreadPoolExecutor(_base.Executor):
submit.__doc__ = _base.Executor.submit.__doc__ submit.__doc__ = _base.Executor.submit.__doc__
def _adjust_thread_count(self): def _adjust_thread_count(self):
# When the executor gets lost, the weakref callback will wake up
# the worker threads.
def weakref_cb(_, q=self._work_queue):
q.put(None)
# TODO(bquinlan): Should avoid creating new threads if there are more # TODO(bquinlan): Should avoid creating new threads if there are more
# idle threads than items in the work queue. # idle threads than items in the work queue.
if len(self._threads) < self._max_workers: if len(self._threads) < self._max_workers:
t = threading.Thread(target=_worker, t = threading.Thread(target=_worker,
args=(weakref.ref(self), self._work_queue)) args=(weakref.ref(self, weakref_cb),
self._work_queue))
t.daemon = True t.daemon = True
t.start() t.start()
self._threads.add(t) self._threads.add(t)
_live_threads.add(t) _threads_queues[t] = self._work_queue
def shutdown(self, wait=True): def shutdown(self, wait=True):
with self._shutdown_lock: with self._shutdown_lock:
self._shutdown = True self._shutdown = True
self._work_queue.put(None)
if wait: if wait:
for t in self._threads: for t in self._threads:
t.join() t.join()
......
...@@ -91,6 +91,9 @@ Core and Builtins ...@@ -91,6 +91,9 @@ Core and Builtins
Library Library
------- -------
- Issue #11635: Don't use polling in worker threads and processes launched by
concurrent.futures.
- Issue #6811: Allow importlib to change a code object's co_filename attribute - Issue #6811: Allow importlib to change a code object's co_filename attribute
to match the path to where the source code currently is, not where the code to match the path to where the source code currently is, not where the code
object originally came from. object originally came from.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment