Commit 9535aff9 authored by Andrew Svetlov's avatar Andrew Svetlov Committed by Miss Islington (bot)

Revert "bpo-35621: Support running subprocesses in asyncio when loop is...

Revert "bpo-35621: Support running subprocesses in asyncio when loop is executed in non-main thread (#13630)" (GH-13793)



https://bugs.python.org/issue35621
parent eddef861
...@@ -2,7 +2,6 @@ ...@@ -2,7 +2,6 @@
import errno import errno
import io import io
import itertools
import os import os
import selectors import selectors
import signal import signal
...@@ -30,9 +29,7 @@ from .log import logger ...@@ -30,9 +29,7 @@ from .log import logger
__all__ = ( __all__ = (
'SelectorEventLoop', 'SelectorEventLoop',
'AbstractChildWatcher', 'SafeChildWatcher', 'AbstractChildWatcher', 'SafeChildWatcher',
'FastChildWatcher', 'FastChildWatcher', 'DefaultEventLoopPolicy',
'MultiLoopChildWatcher', 'ThreadedChildWatcher',
'DefaultEventLoopPolicy',
) )
...@@ -187,13 +184,6 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): ...@@ -187,13 +184,6 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
stdin, stdout, stderr, bufsize, stdin, stdout, stderr, bufsize,
extra=None, **kwargs): extra=None, **kwargs):
with events.get_child_watcher() as watcher: with events.get_child_watcher() as watcher:
if not watcher.is_active():
# Check early.
# Raising exception before process creation
# prevents subprocess execution if the watcher
# is not ready to handle it.
raise RuntimeError("asyncio.get_child_watcher() is not activated, "
"subprocess support is not installed.")
waiter = self.create_future() waiter = self.create_future()
transp = _UnixSubprocessTransport(self, protocol, args, shell, transp = _UnixSubprocessTransport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize, stdin, stdout, stderr, bufsize,
...@@ -848,15 +838,6 @@ class AbstractChildWatcher: ...@@ -848,15 +838,6 @@ class AbstractChildWatcher:
""" """
raise NotImplementedError() raise NotImplementedError()
def is_active(self):
"""Watcher status.
Return True if the watcher is installed and ready to handle process exit
notifications.
"""
raise NotImplementedError()
def __enter__(self): def __enter__(self):
"""Enter the watcher's context and allow starting new processes """Enter the watcher's context and allow starting new processes
...@@ -868,20 +849,6 @@ class AbstractChildWatcher: ...@@ -868,20 +849,6 @@ class AbstractChildWatcher:
raise NotImplementedError() raise NotImplementedError()
def _compute_returncode(status):
if os.WIFSIGNALED(status):
# The child process died because of a signal.
return -os.WTERMSIG(status)
elif os.WIFEXITED(status):
# The child process exited (e.g sys.exit()).
return os.WEXITSTATUS(status)
else:
# The child exited, but we don't understand its status.
# This shouldn't happen, but if it does, let's just
# return that status; perhaps that helps debug it.
return status
class BaseChildWatcher(AbstractChildWatcher): class BaseChildWatcher(AbstractChildWatcher):
def __init__(self): def __init__(self):
...@@ -891,9 +858,6 @@ class BaseChildWatcher(AbstractChildWatcher): ...@@ -891,9 +858,6 @@ class BaseChildWatcher(AbstractChildWatcher):
def close(self): def close(self):
self.attach_loop(None) self.attach_loop(None)
def is_active(self):
return self._loop is not None and self._loop.is_running()
def _do_waitpid(self, expected_pid): def _do_waitpid(self, expected_pid):
raise NotImplementedError() raise NotImplementedError()
...@@ -934,6 +898,19 @@ class BaseChildWatcher(AbstractChildWatcher): ...@@ -934,6 +898,19 @@ class BaseChildWatcher(AbstractChildWatcher):
'exception': exc, 'exception': exc,
}) })
def _compute_returncode(self, status):
if os.WIFSIGNALED(status):
# The child process died because of a signal.
return -os.WTERMSIG(status)
elif os.WIFEXITED(status):
# The child process exited (e.g sys.exit()).
return os.WEXITSTATUS(status)
else:
# The child exited, but we don't understand its status.
# This shouldn't happen, but if it does, let's just
# return that status; perhaps that helps debug it.
return status
class SafeChildWatcher(BaseChildWatcher): class SafeChildWatcher(BaseChildWatcher):
"""'Safe' child watcher implementation. """'Safe' child watcher implementation.
...@@ -957,6 +934,11 @@ class SafeChildWatcher(BaseChildWatcher): ...@@ -957,6 +934,11 @@ class SafeChildWatcher(BaseChildWatcher):
pass pass
def add_child_handler(self, pid, callback, *args): def add_child_handler(self, pid, callback, *args):
if self._loop is None:
raise RuntimeError(
"Cannot add child handler, "
"the child watcher does not have a loop attached")
self._callbacks[pid] = (callback, args) self._callbacks[pid] = (callback, args)
# Prevent a race condition in case the child is already terminated. # Prevent a race condition in case the child is already terminated.
...@@ -992,7 +974,7 @@ class SafeChildWatcher(BaseChildWatcher): ...@@ -992,7 +974,7 @@ class SafeChildWatcher(BaseChildWatcher):
# The child process is still alive. # The child process is still alive.
return return
returncode = _compute_returncode(status) returncode = self._compute_returncode(status)
if self._loop.get_debug(): if self._loop.get_debug():
logger.debug('process %s exited with returncode %s', logger.debug('process %s exited with returncode %s',
expected_pid, returncode) expected_pid, returncode)
...@@ -1053,6 +1035,11 @@ class FastChildWatcher(BaseChildWatcher): ...@@ -1053,6 +1035,11 @@ class FastChildWatcher(BaseChildWatcher):
def add_child_handler(self, pid, callback, *args): def add_child_handler(self, pid, callback, *args):
assert self._forks, "Must use the context manager" assert self._forks, "Must use the context manager"
if self._loop is None:
raise RuntimeError(
"Cannot add child handler, "
"the child watcher does not have a loop attached")
with self._lock: with self._lock:
try: try:
returncode = self._zombies.pop(pid) returncode = self._zombies.pop(pid)
...@@ -1085,7 +1072,7 @@ class FastChildWatcher(BaseChildWatcher): ...@@ -1085,7 +1072,7 @@ class FastChildWatcher(BaseChildWatcher):
# A child process is still alive. # A child process is still alive.
return return
returncode = _compute_returncode(status) returncode = self._compute_returncode(status)
with self._lock: with self._lock:
try: try:
...@@ -1114,177 +1101,6 @@ class FastChildWatcher(BaseChildWatcher): ...@@ -1114,177 +1101,6 @@ class FastChildWatcher(BaseChildWatcher):
callback(pid, returncode, *args) callback(pid, returncode, *args)
class MultiLoopChildWatcher(AbstractChildWatcher):
# The class keeps compatibility with AbstractChildWatcher ABC
# To achieve this it has empty attach_loop() method
# and doesn't accept explicit loop argument
# for add_child_handler()/remove_child_handler()
# but retrieves the current loop by get_running_loop()
def __init__(self):
self._callbacks = {}
self._saved_sighandler = None
def is_active(self):
return self._saved_sighandler is not None
def close(self):
self._callbacks.clear()
if self._saved_sighandler is not None:
handler = signal.getsignal(signal.SIGCHLD)
if handler != self._sig_chld:
logger.warning("SIGCHLD handler was changed by outside code")
else:
signal.signal(signal.SIGCHLD, self._saved_sighandler)
self._saved_sighandler = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def add_child_handler(self, pid, callback, *args):
loop = events.get_running_loop()
self._callbacks[pid] = (loop, callback, args)
# Prevent a race condition in case the child is already terminated.
self._do_waitpid(pid)
def remove_child_handler(self, pid):
try:
del self._callbacks[pid]
return True
except KeyError:
return False
def attach_loop(self, loop):
# Don't save the loop but initialize itself if called first time
# The reason to do it here is that attach_loop() is called from
# unix policy only for the main thread.
# Main thread is required for subscription on SIGCHLD signal
if self._saved_sighandler is None:
self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld)
if self._saved_sighandler is None:
logger.warning("Previous SIGCHLD handler was set by non-Python code, "
"restore to default handler on watcher close.")
self._saved_sighandler = signal.SIG_DFL
# Set SA_RESTART to limit EINTR occurrences.
signal.siginterrupt(signal.SIGCHLD, False)
def _do_waitpid_all(self):
for pid in list(self._callbacks):
self._do_waitpid(pid)
def _do_waitpid(self, expected_pid):
assert expected_pid > 0
try:
pid, status = os.waitpid(expected_pid, os.WNOHANG)
except ChildProcessError:
# The child process is already reaped
# (may happen if waitpid() is called elsewhere).
pid = expected_pid
returncode = 255
logger.warning(
"Unknown child process pid %d, will report returncode 255",
pid)
debug_log = False
else:
if pid == 0:
# The child process is still alive.
return
returncode = _compute_returncode(status)
debug_log = True
try:
loop, callback, args = self._callbacks.pop(pid)
except KeyError: # pragma: no cover
# May happen if .remove_child_handler() is called
# after os.waitpid() returns.
logger.warning("Child watcher got an unexpected pid: %r",
pid, exc_info=True)
else:
if loop.is_closed():
logger.warning("Loop %r that handles pid %r is closed", loop, pid)
else:
if debug_log and loop.get_debug():
logger.debug('process %s exited with returncode %s',
expected_pid, returncode)
loop.call_soon_threadsafe(callback, pid, returncode, *args)
def _sig_chld(self, signum, frame):
try:
self._do_waitpid_all()
except (SystemExit, KeyboardInterrupt):
raise
except BaseException:
logger.warning('Unknown exception in SIGCHLD handler', exc_info=True)
class ThreadedChildWatcher(AbstractChildWatcher):
# The watcher uses a thread per process
# for waiting for the process finish.
# It doesn't require subscription on POSIX signal
def __init__(self):
self._pid_counter = itertools.count(0)
def is_active(self):
return True
def close(self):
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def add_child_handler(self, pid, callback, *args):
loop = events.get_running_loop()
thread = threading.Thread(target=self._do_waitpid,
name=f"waitpid-{next(self._pid_counter)}",
args=(loop, pid, callback, args),
daemon=True)
thread.start()
def remove_child_handler(self, pid):
# asyncio never calls remove_child_handler() !!!
# The method is no-op but is implemented because
# abstract base classe requires it
return True
def attach_loop(self, loop):
pass
def _do_waitpid(self, loop, expected_pid, callback, args):
assert expected_pid > 0
try:
pid, status = os.waitpid(expected_pid, 0)
except ChildProcessError:
# The child process is already reaped
# (may happen if waitpid() is called elsewhere).
pid = expected_pid
returncode = 255
logger.warning(
"Unknown child process pid %d, will report returncode 255",
pid)
else:
returncode = _compute_returncode(status)
if loop.get_debug():
logger.debug('process %s exited with returncode %s',
expected_pid, returncode)
if loop.is_closed():
logger.warning("Loop %r that handles pid %r is closed", loop, pid)
else:
loop.call_soon_threadsafe(callback, pid, returncode, *args)
class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
"""UNIX event loop policy with a watcher for child processes.""" """UNIX event loop policy with a watcher for child processes."""
_loop_factory = _UnixSelectorEventLoop _loop_factory = _UnixSelectorEventLoop
...@@ -1296,7 +1112,7 @@ class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): ...@@ -1296,7 +1112,7 @@ class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
def _init_watcher(self): def _init_watcher(self):
with events._lock: with events._lock:
if self._watcher is None: # pragma: no branch if self._watcher is None: # pragma: no branch
self._watcher = ThreadedChildWatcher() self._watcher = SafeChildWatcher()
if isinstance(threading.current_thread(), if isinstance(threading.current_thread(),
threading._MainThread): threading._MainThread):
self._watcher.attach_loop(self._local._loop) self._watcher.attach_loop(self._local._loop)
...@@ -1318,7 +1134,7 @@ class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): ...@@ -1318,7 +1134,7 @@ class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
def get_child_watcher(self): def get_child_watcher(self):
"""Get the watcher for child processes. """Get the watcher for child processes.
If not yet set, a ThreadedChildWatcher object is automatically created. If not yet set, a SafeChildWatcher object is automatically created.
""" """
if self._watcher is None: if self._watcher is None:
self._init_watcher() self._init_watcher()
......
...@@ -633,7 +633,6 @@ class SubprocessMixin: ...@@ -633,7 +633,6 @@ class SubprocessMixin:
self.assertIsNone(self.loop.run_until_complete(execute())) self.assertIsNone(self.loop.run_until_complete(execute()))
if sys.platform != 'win32': if sys.platform != 'win32':
# Unix # Unix
class SubprocessWatcherMixin(SubprocessMixin): class SubprocessWatcherMixin(SubprocessMixin):
...@@ -649,24 +648,7 @@ if sys.platform != 'win32': ...@@ -649,24 +648,7 @@ if sys.platform != 'win32':
watcher = self.Watcher() watcher = self.Watcher()
watcher.attach_loop(self.loop) watcher.attach_loop(self.loop)
policy.set_child_watcher(watcher) policy.set_child_watcher(watcher)
self.addCleanup(policy.set_child_watcher, None)
def tearDown(self):
super().setUp()
policy = asyncio.get_event_loop_policy()
watcher = policy.get_child_watcher()
policy.set_child_watcher(None)
watcher.attach_loop(None)
watcher.close()
class SubprocessThreadedWatcherTests(SubprocessWatcherMixin,
test_utils.TestCase):
Watcher = unix_events.ThreadedChildWatcher
class SubprocessMultiLoopWatcherTests(SubprocessWatcherMixin,
test_utils.TestCase):
Watcher = unix_events.MultiLoopChildWatcher
class SubprocessSafeWatcherTests(SubprocessWatcherMixin, class SubprocessSafeWatcherTests(SubprocessWatcherMixin,
test_utils.TestCase): test_utils.TestCase):
...@@ -688,25 +670,5 @@ else: ...@@ -688,25 +670,5 @@ else:
self.set_event_loop(self.loop) self.set_event_loop(self.loop)
class GenericWatcherTests:
def test_create_subprocess_fails_with_inactive_watcher(self):
async def execute():
watcher = mock.create_authspec(asyncio.AbstractChildWatcher)
watcher.is_active.return_value = False
asyncio.set_child_watcher(watcher)
with self.assertRaises(RuntimeError):
await subprocess.create_subprocess_exec(
support.FakePath(sys.executable), '-c', 'pass')
watcher.add_child_handler.assert_not_called()
self.assertIsNone(self.loop.run_until_complete(execute()))
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
...@@ -1082,8 +1082,6 @@ class AbstractChildWatcherTests(unittest.TestCase): ...@@ -1082,8 +1082,6 @@ class AbstractChildWatcherTests(unittest.TestCase):
NotImplementedError, watcher.attach_loop, f) NotImplementedError, watcher.attach_loop, f)
self.assertRaises( self.assertRaises(
NotImplementedError, watcher.close) NotImplementedError, watcher.close)
self.assertRaises(
NotImplementedError, watcher.is_active)
self.assertRaises( self.assertRaises(
NotImplementedError, watcher.__enter__) NotImplementedError, watcher.__enter__)
self.assertRaises( self.assertRaises(
...@@ -1786,6 +1784,15 @@ class ChildWatcherTestsMixin: ...@@ -1786,6 +1784,15 @@ class ChildWatcherTestsMixin:
if isinstance(self.watcher, asyncio.FastChildWatcher): if isinstance(self.watcher, asyncio.FastChildWatcher):
self.assertFalse(self.watcher._zombies) self.assertFalse(self.watcher._zombies)
@waitpid_mocks
def test_add_child_handler_with_no_loop_attached(self, m):
callback = mock.Mock()
with self.create_watcher() as watcher:
with self.assertRaisesRegex(
RuntimeError,
'the child watcher does not have a loop attached'):
watcher.add_child_handler(100, callback)
class SafeChildWatcherTests (ChildWatcherTestsMixin, test_utils.TestCase): class SafeChildWatcherTests (ChildWatcherTestsMixin, test_utils.TestCase):
def create_watcher(self): def create_watcher(self):
...@@ -1802,16 +1809,17 @@ class PolicyTests(unittest.TestCase): ...@@ -1802,16 +1809,17 @@ class PolicyTests(unittest.TestCase):
def create_policy(self): def create_policy(self):
return asyncio.DefaultEventLoopPolicy() return asyncio.DefaultEventLoopPolicy()
def test_get_default_child_watcher(self): def test_get_child_watcher(self):
policy = self.create_policy() policy = self.create_policy()
self.assertIsNone(policy._watcher) self.assertIsNone(policy._watcher)
watcher = policy.get_child_watcher() watcher = policy.get_child_watcher()
self.assertIsInstance(watcher, asyncio.ThreadedChildWatcher) self.assertIsInstance(watcher, asyncio.SafeChildWatcher)
self.assertIs(policy._watcher, watcher) self.assertIs(policy._watcher, watcher)
self.assertIs(watcher, policy.get_child_watcher()) self.assertIs(watcher, policy.get_child_watcher())
self.assertIsNone(watcher._loop)
def test_get_child_watcher_after_set(self): def test_get_child_watcher_after_set(self):
policy = self.create_policy() policy = self.create_policy()
...@@ -1821,6 +1829,18 @@ class PolicyTests(unittest.TestCase): ...@@ -1821,6 +1829,18 @@ class PolicyTests(unittest.TestCase):
self.assertIs(policy._watcher, watcher) self.assertIs(policy._watcher, watcher)
self.assertIs(watcher, policy.get_child_watcher()) self.assertIs(watcher, policy.get_child_watcher())
def test_get_child_watcher_with_mainloop_existing(self):
policy = self.create_policy()
loop = policy.get_event_loop()
self.assertIsNone(policy._watcher)
watcher = policy.get_child_watcher()
self.assertIsInstance(watcher, asyncio.SafeChildWatcher)
self.assertIs(watcher._loop, loop)
loop.close()
def test_get_child_watcher_thread(self): def test_get_child_watcher_thread(self):
def f(): def f():
...@@ -1846,11 +1866,7 @@ class PolicyTests(unittest.TestCase): ...@@ -1846,11 +1866,7 @@ class PolicyTests(unittest.TestCase):
policy = self.create_policy() policy = self.create_policy()
loop = policy.get_event_loop() loop = policy.get_event_loop()
# Explicitly setup SafeChildWatcher, watcher = policy.get_child_watcher()
# default ThreadedChildWatcher has no _loop property
watcher = asyncio.SafeChildWatcher()
policy.set_child_watcher(watcher)
watcher.attach_loop(loop)
self.assertIs(watcher._loop, loop) self.assertIs(watcher._loop, loop)
......
Support running asyncio subprocesses when execution event loop in a thread
on UNIX.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment