From e0d9b60a3179bc7cf0391430d5592cd7b9295f02 Mon Sep 17 00:00:00 2001 From: Jason Madden <jamadden@gmail.com> Date: Fri, 10 Jul 2015 13:43:02 -0500 Subject: [PATCH] A cooperative version of waitpid that plays nicely with child watchers and gevent.subprocess. Fixes #600. Ref #452. --- changelog.rst | 6 +++ gevent/_semaphore.pxd | 2 +- gevent/hub.py | 4 ++ gevent/os.py | 93 +++++++++++++++++++++++++++++++++++++ gevent/subprocess.py | 5 +- greentest/test__issue600.py | 43 +++++++++++++++++ 6 files changed, 149 insertions(+), 4 deletions(-) create mode 100644 greentest/test__issue600.py diff --git a/changelog.rst b/changelog.rst index 746c2791..71ac2038 100644 --- a/changelog.rst +++ b/changelog.rst @@ -16,6 +16,12 @@ Unreleased - Fixed regression that failed to set the ``successful`` value to False when killing a greenlet before it ran with a non-default exception. Fixed in :pr:`608` by Heungsub Lee. +- libev's child watchers caused ``os.waitpid`` to become unreliable + due to the use of signals on POSIX platforms. This was especially + noticeable when using ``gevent.subprocess`` in combination with + ``multiprocessing``. Now, the monkey-patched ``os`` module provides + a ``waitpid`` function that seeks to ameliorate this. Reported in + :issue:`600` by champax and :issue:`452` by Åukasz KawczyÅ„ski. 1.1a2 (Jul 8, 2015) =================== diff --git a/gevent/_semaphore.pxd b/gevent/_semaphore.pxd index 3e6900c1..228ad18b 100644 --- a/gevent/_semaphore.pxd +++ b/gevent/_semaphore.pxd @@ -2,7 +2,7 @@ cdef class Semaphore: cdef public int counter cdef readonly object _links cdef readonly object _notifier - cdef readonly int _dirty + cdef public int _dirty cpdef locked(self) cpdef release(self) diff --git a/gevent/hub.py b/gevent/hub.py index ab463a66..bdfe534c 100644 --- a/gevent/hub.py +++ b/gevent/hub.py @@ -343,6 +343,10 @@ class Hub(greenlet): return result + '>' def handle_error(self, context, type, value, tb): + if isinstance(value, str): + # Cython can raise errors where the value is a plain string + # e.g., AttributeError, "_semaphore.Semaphore has no attr", <traceback> + value = type(value) if not issubclass(type, self.NOT_ERROR): self.print_exception(context, type, value, tb) if context is None or issubclass(type, self.SYSTEM_ERROR): diff --git a/gevent/os.py b/gevent/os.py index 6d9064bd..68b4219c 100644 --- a/gevent/os.py +++ b/gevent/os.py @@ -102,6 +102,99 @@ if hasattr(os, 'fork'): reinit() return result + if hasattr(os, 'WNOWAIT') or hasattr(os, 'WNOHANG'): + # We can only do this on POSIX + import time + + _waitpid = os.waitpid + _WNOHANG = os.WNOHANG + + # {pid -> watcher or tuple(pid, rstatus, timestamp)} + _watched_children = {} + + def _on_child(watcher, callback): + # XXX: Could handle tracing here by not stopping + # until the pid is terminated + watcher.stop() + _watched_children[watcher.pid] = (watcher.pid, watcher.rstatus, time.time()) + if callback: + callback(watcher) + # now is as good a time as any to reap children + _reap_children() + + def _reap_children(timeout=60): + # Remove all the dead children that haven't been waited on + # for the *timeout* + now = time.time() + oldest_allowed = now - timeout + for pid in _watched_children.keys(): + val = _watched_children[pid] + if isinstance(val, tuple) and val[2] < oldest_allowed: + del _watched_children[pid] + + def waitpid(pid, options): + # XXX Does not handle tracing children + if pid <= 0: + # magic functions for multiple children. Pass. + return _waitpid(pid, options) + + if pid in _watched_children: + # yes, we're watching it + if options & _WNOHANG or isinstance(_watched_children[pid], tuple): + # We're either asked not to block, or it already finished, in which + # case blocking doesn't matter + result = _watched_children[pid] + if isinstance(result, tuple): + # it finished. libev child watchers + # are one-shot + del _watched_children[pid] + return result[:2] + # it's not finished + return (0, 0) + else: + # we should block. Let the underlying OS call block; it should + # eventually die with OSError, depending on signal delivery + try: + return _waitpid(pid, options) + except OSError: + if pid in _watched_children and isinstance(_watched_children, tuple): + result = _watched_children[pid] + del _watched_children[pid] + return result[:2] + raise + # we're not watching it + return _waitpid(pid, options) + + def fork_and_watch(callback=None, loop=None, fork=fork): + """ + Fork a child process and start a child watcher for it in the parent process. + + This call cooperates with the :func:`waitpid`` function defined in this module. + + :keyword callback: If given, a callable that will be called with the child watcher + when the child finishes. + :keyword loop: The loop to start the watcher in. Defaults to the + current loop. + :keyword fork: The fork function. Defaults to the one defined in this + module. + + .. versionadded: 1.1a3 + """ + pid = fork() + if pid: + # parent + loop = loop or get_hub().loop + watcher = loop.child(pid) + _watched_children[pid] = watcher + watcher.start(_on_child, watcher, callback) + return pid + + # Watch children by default + fork = fork_and_watch + + __extensions__.append('fork_and_watch') + __implements__.append("waitpid") + else: __implements__.remove('fork') diff --git a/gevent/subprocess.py b/gevent/subprocess.py index 763d2448..81a79021 100644 --- a/gevent/subprocess.py +++ b/gevent/subprocess.py @@ -112,6 +112,7 @@ else: import pickle from gevent import monkey fork = monkey.get_original('os', 'fork') + from gevent.os import fork_and_watch if PY3: def call(*popenargs, **kwargs): @@ -934,7 +935,7 @@ class Popen(object): # write to stderr -> hang. http://bugs.python.org/issue1336 gc.disable() try: - self.pid = fork() + self.pid = fork_and_watch(self._on_child, self._loop, fork) except: if gc_was_enabled: gc.enable() @@ -1040,8 +1041,6 @@ class Popen(object): os._exit(1) # Parent - self._watcher = self._loop.child(self.pid) - self._watcher.start(self._on_child, self._watcher) if gc_was_enabled: gc.enable() diff --git a/greentest/test__issue600.py b/greentest/test__issue600.py new file mode 100644 index 00000000..a2273921 --- /dev/null +++ b/greentest/test__issue600.py @@ -0,0 +1,43 @@ +# Make sure that libev child watchers, implicitly installed through the use +# of subprocess, do not cause waitpid() to fail to poll for processes. +# NOTE: This was only reproducible under python 2. +import gevent +from gevent import monkey +monkey.patch_all() + +from multiprocessing import Process +from gevent.subprocess import Popen, PIPE + + +def test_invoke(): + # Run a subprocess through Popen to make sure + # libev is handling SIGCHLD. This could *probably* be simplified to use + # just hub.loop.install_sigchld + + p = Popen("true", stdout=PIPE, stderr=PIPE) + gevent.sleep(0) + p.communicate() + gevent.sleep(0) + + +def f(sleep_sec): + gevent.sleep(sleep_sec) + + +def test_process(): + # Launch + p = Process(target=f, args=(1.0,)) + p.start() + + with gevent.Timeout(3): + # Poll for up to 10 seconds. If the bug exists, + # this will timeout because our subprocess should + # be long gone by now + p.join(10) + + +if __name__ == '__main__': + # do a subprocess open + test_invoke() + + test_process() -- 2.30.9