Commit dd696496 authored by Antoine Pitrou's avatar Antoine Pitrou

Issue #9205: concurrent.futures.ProcessPoolExecutor now detects killed

children and raises BrokenProcessPool in such a situation.  Previously it
would reliably freeze/deadlock.
parent 4a5e5de0
......@@ -169,6 +169,12 @@ to a :class:`ProcessPoolExecutor` will result in deadlock.
of at most *max_workers* processes. If *max_workers* is ``None`` or not
given, it will default to the number of processors on the machine.
.. versionchanged:: 3.3
When one of the worker processes terminates abruptly, a
:exc:`BrokenProcessPool` error is now raised. Previously, behaviour
was undefined but operations on the executor or its futures would often
freeze or deadlock.
.. _processpoolexecutor-example:
......@@ -369,3 +375,16 @@ Module Functions
:pep:`3148` -- futures - execute computations asynchronously
The proposal which described this feature for inclusion in the Python
standard library.
Exception classes
-----------------
.. exception:: BrokenProcessPool
Derived from :exc:`RuntimeError`, this exception class is raised when
one of the workers of a :class:`ProcessPoolExecutor` has terminated
in a non-clean fashion (for example, if it was killed from the outside).
.. versionadded:: 3.3
......@@ -46,10 +46,11 @@ Process #1..n:
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
import atexit
import os
from concurrent.futures import _base
import queue
import multiprocessing
from multiprocessing.queues import SimpleQueue
from multiprocessing.queues import SimpleQueue, SentinelReady
import threading
import weakref
......@@ -122,7 +123,7 @@ def _process_worker(call_queue, result_queue):
call_item = call_queue.get(block=True)
if call_item is None:
# Wake up queue management thread
result_queue.put(None)
result_queue.put(os.getpid())
return
try:
r = call_item.fn(*call_item.args, **call_item.kwargs)
......@@ -194,29 +195,63 @@ def _queue_management_worker(executor_reference,
result_queue: A multiprocessing.Queue of _ResultItems generated by the
process workers.
"""
nb_shutdown_processes = 0
def shutdown_one_process():
"""Tell a worker to terminate, which will in turn wake us again"""
nonlocal nb_shutdown_processes
call_queue.put(None)
nb_shutdown_processes += 1
def shutdown_worker():
# This is an upper bound
nb_children_alive = sum(p.is_alive() for p in processes.values())
for i in range(0, nb_children_alive):
call_queue.put(None)
# If .join() is not called on the created processes then
# some multiprocessing.Queue methods may deadlock on Mac OS
# X.
for p in processes.values():
p.join()
while True:
_add_call_item_to_queue(pending_work_items,
work_ids_queue,
call_queue)
result_item = result_queue.get()
if result_item is not None:
work_item = pending_work_items[result_item.work_id]
del pending_work_items[result_item.work_id]
if result_item.exception:
work_item.future.set_exception(result_item.exception)
else:
work_item.future.set_result(result_item.result)
continue
# If we come here, we either got a timeout or were explicitly woken up.
# In either case, check whether we should start shutting down.
sentinels = [p.sentinel for p in processes.values()]
assert sentinels
try:
result_item = result_queue.get(sentinels=sentinels)
except SentinelReady as e:
# Mark the process pool broken so that submits fail right now.
executor = executor_reference()
if executor is not None:
executor._broken = True
executor._shutdown_thread = True
del executor
# All futures in flight must be marked failed
for work_id, work_item in pending_work_items.items():
work_item.future.set_exception(
BrokenProcessPool(
"A process in the process pool was "
"terminated abruptly while the future was "
"running or pending."
))
pending_work_items.clear()
# Terminate remaining workers forcibly: the queues or their
# locks may be in a dirty state and block forever.
for p in processes.values():
p.terminate()
for p in processes.values():
p.join()
return
if isinstance(result_item, int):
# Clean shutdown of a worker using its PID
# (avoids marking the executor broken)
del processes[result_item]
elif result_item is not None:
work_item = pending_work_items.pop(result_item.work_id, None)
# work_item can be None if another process terminated (see above)
if work_item is not None:
if result_item.exception:
work_item.future.set_exception(result_item.exception)
else:
work_item.future.set_result(result_item.result)
# Check whether we should start shutting down.
executor = executor_reference()
# No more work items can be added if:
# - The interpreter is shutting down OR
......@@ -226,17 +261,11 @@ def _queue_management_worker(executor_reference,
# Since no new work items can be added, it is safe to shutdown
# this thread if there are no pending work items.
if not pending_work_items:
while nb_shutdown_processes < len(processes):
shutdown_one_process()
# If .join() is not called on the created processes then
# some multiprocessing.Queue methods may deadlock on Mac OS
# X.
for p in processes:
p.join()
shutdown_worker()
return
else:
# Start shutting down by telling a process it can exit.
shutdown_one_process()
call_queue.put(None)
del executor
_system_limits_checked = False
......@@ -264,6 +293,14 @@ def _check_system_limits():
_system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
raise NotImplementedError(_system_limited)
class BrokenProcessPool(RuntimeError):
"""
Raised when a process in a ProcessPoolExecutor terminated abruptly
while a future was in the running state.
"""
class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None):
"""Initializes a new ProcessPoolExecutor instance.
......@@ -288,11 +325,13 @@ class ProcessPoolExecutor(_base.Executor):
self._result_queue = SimpleQueue()
self._work_ids = queue.Queue()
self._queue_management_thread = None
self._processes = set()
# Map of pids to processes
self._processes = {}
# Shutdown is a two-step process.
self._shutdown_thread = False
self._shutdown_lock = threading.Lock()
self._broken = False
self._queue_count = 0
self._pending_work_items = {}
......@@ -302,6 +341,8 @@ class ProcessPoolExecutor(_base.Executor):
def weakref_cb(_, q=self._result_queue):
q.put(None)
if self._queue_management_thread is None:
# Start the processes so that their sentinels are known.
self._adjust_process_count()
self._queue_management_thread = threading.Thread(
target=_queue_management_worker,
args=(weakref.ref(self, weakref_cb),
......@@ -321,10 +362,13 @@ class ProcessPoolExecutor(_base.Executor):
args=(self._call_queue,
self._result_queue))
p.start()
self._processes.add(p)
self._processes[p.pid] = p
def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._broken:
raise BrokenProcessPool('A child process terminated '
'abruptly, the process pool is not usable anymore')
if self._shutdown_thread:
raise RuntimeError('cannot schedule new futures after shutdown')
......@@ -338,7 +382,6 @@ class ProcessPoolExecutor(_base.Executor):
self._result_queue.put(None)
self._start_queue_management_thread()
self._adjust_process_count()
return f
submit.__doc__ = _base.Executor.submit.__doc__
......
......@@ -48,14 +48,18 @@ import itertools
import _multiprocessing
from multiprocessing import current_process, AuthenticationError, BufferTooShort
from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug
from multiprocessing.util import (
get_temp_dir, Finalize, sub_debug, debug, _eintr_retry)
try:
from _multiprocessing import win32
from _subprocess import WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE
except ImportError:
if sys.platform == 'win32':
raise
win32 = None
_select = _eintr_retry(select.select)
#
#
#
......@@ -118,6 +122,15 @@ def address_type(address):
else:
raise ValueError('address type of %r unrecognized' % address)
class SentinelReady(Exception):
"""
Raised when a sentinel is ready when polling.
"""
def __init__(self, *args):
Exception.__init__(self, *args)
self.sentinels = args[0]
#
# Connection classes
#
......@@ -253,19 +266,17 @@ class _ConnectionBase:
(offset + size) // itemsize])
return size
def recv(self):
def recv(self, sentinels=None):
"""Receive a (picklable) object"""
self._check_closed()
self._check_readable()
buf = self._recv_bytes()
buf = self._recv_bytes(sentinels=sentinels)
return pickle.loads(buf.getbuffer())
def poll(self, timeout=0.0):
"""Whether there is any input available to be read"""
self._check_closed()
self._check_readable()
if timeout < 0.0:
timeout = None
return self._poll(timeout)
......@@ -274,61 +285,88 @@ if win32:
class PipeConnection(_ConnectionBase):
"""
Connection class based on a Windows named pipe.
Overlapped I/O is used, so the handles must have been created
with FILE_FLAG_OVERLAPPED.
"""
_buffered = b''
def _close(self):
win32.CloseHandle(self._handle)
def _send_bytes(self, buf):
nwritten = win32.WriteFile(self._handle, buf)
overlapped = win32.WriteFile(self._handle, buf, overlapped=True)
nwritten, complete = overlapped.GetOverlappedResult(True)
assert complete
assert nwritten == len(buf)
def _recv_bytes(self, maxsize=None):
def _recv_bytes(self, maxsize=None, sentinels=()):
if sentinels:
self._poll(-1.0, sentinels)
buf = io.BytesIO()
bufsize = 512
if maxsize is not None:
bufsize = min(bufsize, maxsize)
try:
firstchunk, complete = win32.ReadFile(self._handle, bufsize)
except IOError as e:
if e.errno == win32.ERROR_BROKEN_PIPE:
raise EOFError
raise
lenfirstchunk = len(firstchunk)
buf.write(firstchunk)
if complete:
return buf
firstchunk = self._buffered
if firstchunk:
lenfirstchunk = len(firstchunk)
buf.write(firstchunk)
self._buffered = b''
else:
# A reasonable size for the first chunk transfer
bufsize = 128
if maxsize is not None and maxsize < bufsize:
bufsize = maxsize
try:
overlapped = win32.ReadFile(self._handle, bufsize, overlapped=True)
lenfirstchunk, complete = overlapped.GetOverlappedResult(True)
firstchunk = overlapped.getbuffer()
assert lenfirstchunk == len(firstchunk)
except IOError as e:
if e.errno == win32.ERROR_BROKEN_PIPE:
raise EOFError
raise
buf.write(firstchunk)
if complete:
return buf
navail, nleft = win32.PeekNamedPipe(self._handle)
if maxsize is not None and lenfirstchunk + nleft > maxsize:
return None
lastchunk, complete = win32.ReadFile(self._handle, nleft)
assert complete
buf.write(lastchunk)
if nleft > 0:
overlapped = win32.ReadFile(self._handle, nleft, overlapped=True)
res, complete = overlapped.GetOverlappedResult(True)
assert res == nleft
assert complete
buf.write(overlapped.getbuffer())
return buf
def _poll(self, timeout):
def _poll(self, timeout, sentinels=()):
# Fast non-blocking path
navail, nleft = win32.PeekNamedPipe(self._handle)
if navail > 0:
return True
elif timeout == 0.0:
return False
# Setup a polling loop (translated straight from old
# pipe_connection.c)
# Blocking: use overlapped I/O
if timeout < 0.0:
deadline = None
timeout = INFINITE
else:
deadline = time.time() + timeout
delay = 0.001
max_delay = 0.02
while True:
time.sleep(delay)
navail, nleft = win32.PeekNamedPipe(self._handle)
if navail > 0:
return True
if deadline and time.time() > deadline:
return False
if delay < max_delay:
delay += 0.001
timeout = int(timeout * 1000 + 0.5)
overlapped = win32.ReadFile(self._handle, 1, overlapped=True)
try:
handles = [overlapped.event]
handles += sentinels
res = win32.WaitForMultipleObjects(handles, False, timeout)
finally:
# Always cancel overlapped I/O in the same thread
# (because CancelIoEx() appears only in Vista)
overlapped.cancel()
if res == WAIT_TIMEOUT:
return False
idx = res - WAIT_OBJECT_0
if idx == 0:
# I/O was successful, store received data
overlapped.GetOverlappedResult(True)
self._buffered += overlapped.getbuffer()
return True
assert 0 < idx < len(handles)
raise SentinelReady([handles[idx]])
class Connection(_ConnectionBase):
......@@ -357,11 +395,18 @@ class Connection(_ConnectionBase):
break
buf = buf[n:]
def _recv(self, size, read=_read):
def _recv(self, size, sentinels=(), read=_read):
buf = io.BytesIO()
handle = self._handle
if sentinels:
handles = [handle] + sentinels
remaining = size
while remaining > 0:
chunk = read(self._handle, remaining)
if sentinels:
r = _select(handles, [], [])[0]
if handle not in r:
raise SentinelReady(r)
chunk = read(handle, remaining)
n = len(chunk)
if n == 0:
if remaining == size:
......@@ -381,15 +426,17 @@ class Connection(_ConnectionBase):
if n > 0:
self._send(buf)
def _recv_bytes(self, maxsize=None):
buf = self._recv(4)
def _recv_bytes(self, maxsize=None, sentinels=()):
buf = self._recv(4, sentinels)
size, = struct.unpack("=i", buf.getvalue())
if maxsize is not None and size > maxsize:
return None
return self._recv(size)
return self._recv(size, sentinels)
def _poll(self, timeout):
r = select.select([self._handle], [], [], timeout)[0]
if timeout < 0.0:
timeout = None
r = _select([self._handle], [], [], timeout)[0]
return bool(r)
......@@ -495,23 +542,21 @@ else:
obsize, ibsize = 0, BUFSIZE
h1 = win32.CreateNamedPipe(
address, openmode,
address, openmode | win32.FILE_FLAG_OVERLAPPED,
win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
win32.PIPE_WAIT,
1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
)
h2 = win32.CreateFile(
address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
address, access, 0, win32.NULL, win32.OPEN_EXISTING,
win32.FILE_FLAG_OVERLAPPED, win32.NULL
)
win32.SetNamedPipeHandleState(
h2, win32.PIPE_READMODE_MESSAGE, None, None
)
try:
win32.ConnectNamedPipe(h1, win32.NULL)
except WindowsError as e:
if e.args[0] != win32.ERROR_PIPE_CONNECTED:
raise
overlapped = win32.ConnectNamedPipe(h1, overlapped=True)
overlapped.GetOverlappedResult(True)
c1 = PipeConnection(h1, writable=duplex)
c2 = PipeConnection(h2, readable=duplex)
......
......@@ -35,6 +35,7 @@
import os
import sys
import signal
import select
from multiprocessing import util, process
......
......@@ -44,7 +44,7 @@ import weakref
from queue import Empty, Full
import _multiprocessing
from multiprocessing import Pipe
from multiprocessing.connection import Pipe, SentinelReady
from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition
from multiprocessing.util import debug, info, Finalize, register_after_fork
from multiprocessing.forking import assert_spawning
......@@ -372,10 +372,10 @@ class SimpleQueue(object):
def _make_methods(self):
recv = self._reader.recv
racquire, rrelease = self._rlock.acquire, self._rlock.release
def get():
def get(*, sentinels=None):
racquire()
try:
return recv()
return recv(sentinels)
finally:
rrelease()
self.get = get
......
......@@ -19,7 +19,7 @@ import unittest
from concurrent import futures
from concurrent.futures._base import (
PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
import concurrent.futures.process
from concurrent.futures.process import BrokenProcessPool
def create_future(state=PENDING, exception=None, result=None):
......@@ -154,7 +154,7 @@ class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
processes = self.executor._processes
self.executor.shutdown()
for p in processes:
for p in processes.values():
p.join()
def test_context_manager_shutdown(self):
......@@ -163,7 +163,7 @@ class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
self.assertEqual(list(e.map(abs, range(-5, 5))),
[5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
for p in processes:
for p in processes.values():
p.join()
def test_del_shutdown(self):
......@@ -174,7 +174,7 @@ class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
del executor
queue_management_thread.join()
for p in processes:
for p in processes.values():
p.join()
class WaitTests(unittest.TestCase):
......@@ -381,7 +381,17 @@ class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):
class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest):
pass
def test_killed_child(self):
# When a child process is abruptly terminated, the whole pool gets
# "broken".
futures = [self.executor.submit(time.sleep, 3)]
# Get one of the processes, and terminate (kill) it
p = next(iter(self.executor._processes.values()))
p.terminate()
for fut in futures:
self.assertRaises(BrokenProcessPool, fut.result)
# Submitting other jobs fails as well.
self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
class FutureTests(unittest.TestCase):
......
......@@ -187,6 +187,10 @@ Core and Builtins
Library
-------
- Issue #9205: concurrent.futures.ProcessPoolExecutor now detects killed
children and raises BrokenProcessPool in such a situation. Previously it
would reliably freeze/deadlock.
- Issue #12040: Expose a new attribute ``sentinel`` on instances of
:class:`multiprocessing.Process`. Also, fix Process.join() to not use
polling anymore, when given a timeout.
......
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment