From 5d0cc66afeedf828cbcb97b9512c65a6a3f8f50c Mon Sep 17 00:00:00 2001 From: Jason Madden <jamadden@gmail.com> Date: Sun, 10 Apr 2016 13:34:35 -0500 Subject: [PATCH] implement child watchers in libuv to the extent possible. --- src/gevent/_ffi/watcher.py | 6 ++++-- src/gevent/libuv/loop.py | 21 ++++++++++++++---- src/gevent/libuv/watcher.py | 43 ++++++++++++++++++++++++++++++++++--- src/gevent/os.py | 6 ++++++ 4 files changed, 67 insertions(+), 9 deletions(-) diff --git a/src/gevent/_ffi/watcher.py b/src/gevent/_ffi/watcher.py index 7d3a9cc9..64be193d 100644 --- a/src/gevent/_ffi/watcher.py +++ b/src/gevent/_ffi/watcher.py @@ -60,7 +60,7 @@ class AbstractWatcherType(type): _LIB = None def __new__(cls, name, bases, cls_dict): - if name != 'watcher': + if name != 'watcher' and not cls_dict.get('_watcher_skip_ffi'): cls._fill_watcher(name, bases, cls_dict) return type.__new__(cls, name, bases, cls_dict) @@ -410,9 +410,11 @@ class ChildMixin(object): def rpid(self): return os.getpid() + _rstatus = 0 + @property def rstatus(self): - return 0 + return self._rstatus class StatMixin(object): diff --git a/src/gevent/libuv/loop.py b/src/gevent/libuv/loop.py index 42bf1b7d..e8554998 100644 --- a/src/gevent/libuv/loop.py +++ b/src/gevent/libuv/loop.py @@ -5,7 +5,9 @@ libuv loop implementation from __future__ import absolute_import, print_function import os +from collections import defaultdict import signal +import sys from gevent._ffi.loop import AbstractLoop from gevent.libuv import _corecffi # pylint:disable=no-name-in-module,import-error @@ -36,7 +38,7 @@ class loop(AbstractLoop): def __init__(self, flags=None, default=None): AbstractLoop.__init__(self, ffi, libuv, _watchers, flags, default) self.__loop_pid = os.getpid() - + self._child_watchers = defaultdict(list) def _init_loop(self, flags, default): if default is None: @@ -119,13 +121,13 @@ class loop(AbstractLoop): # had already been closed # (https://github.com/joyent/libuv/issues/1405) - raise NotImplementedError() + #raise NotImplementedError() + pass def run(self, nowait=False, once=False): # we can only respect one flag or the other. # nowait takes precedence because it can't block - print("RUNNING LOOP from", self.__loop_pid, "in", os.getpid()) mode = libuv.UV_RUN_DEFAULT if once: mode = libuv.UV_RUN_ONCE @@ -167,4 +169,15 @@ class loop(AbstractLoop): signal.SIGCHLD) def __sigchld_callback(self, _handler, _signum): - print("SIGCHILD") + while True: + try: + pid, status, _usage = os.wait3(os.WNOHANG) + except OSError: + # Python 3 raises ChildProcessError + break + + if pid == 0: + break + children_watchers = self._child_watchers.get(pid, []) + self._child_watchers.get(0, []) + for watcher in children_watchers: + watcher._set_status(status) diff --git a/src/gevent/libuv/watcher.py b/src/gevent/libuv/watcher.py index ee07c56c..2f9f3a28 100644 --- a/src/gevent/libuv/watcher.py +++ b/src/gevent/libuv/watcher.py @@ -1,6 +1,9 @@ # pylint: disable=too-many-lines, protected-access, redefined-outer-name, not-callable from __future__ import absolute_import, print_function +import os +import sys + import gevent.libuv._corecffi as _corecffi # pylint:disable=no-name-in-module,import-error ffi = _corecffi.ffi # pylint:disable=no-member @@ -104,7 +107,8 @@ class fork(_base.ForkMixin): def stop(self, *args): pass -class child(_base.ChildMixin): +class child(_base.ChildMixin, watcher): + _watcher_skip_ffi = True # We'll have to implement this one completely manually. # Our approach is to use a SIGCHLD handler and the original # os.waitpid call. @@ -114,10 +118,43 @@ class child(_base.ChildMixin): # we're not adding any new SIGCHLD related issues not already # present in libev. - _CALL_SUPER_INIT = False + def __init__(self, *args, **kwargs): + super(child, self).__init__(*args, **kwargs) + self._async = self.loop.async() + + def _watcher_create(self, _args): + return + + @property + def _watcher_handle(self): + return None + + def _watcher_ffi_init(self, args): + return def start(self, cb, *args): - pass + self.loop._child_watchers[self._pid].append(self) + self.callback = cb + self.args = args + self._async.start(cb, *args) + #watcher.start(self, cb, *args) + + @property + def active(self): + return self._async.active + + def stop(self): + try: + self.loop._child_watchers[self._pid].remove(self) + except ValueError: + pass + self.callback = None + self.args = None + self._async.stop() + + def _set_status(self, status): + self._rstatus = status + self._async.send() class async(_base.AsyncMixin, watcher): diff --git a/src/gevent/os.py b/src/gevent/os.py index fffd0dc5..0878daa2 100644 --- a/src/gevent/os.py +++ b/src/gevent/os.py @@ -341,6 +341,12 @@ if hasattr(os, 'fork'): # we're not watching it and it may not even be our child, # so we must go to the OS to be sure to get the right semantics (exception) + # XXX + # libuv has a race condition because the signal + # handler is a Python function, so the InterruptedError + # is raised before the signal handler runs and calls the + # child watcher + # we're not watching it return _waitpid(pid, options) def fork_and_watch(callback=None, loop=None, ref=False, fork=fork_gevent): -- 2.30.9