Commit 5d0cc66a authored by Jason Madden's avatar Jason Madden

implement child watchers in libuv to the extent possible.

parent 8ca9128a
...@@ -60,7 +60,7 @@ class AbstractWatcherType(type): ...@@ -60,7 +60,7 @@ class AbstractWatcherType(type):
_LIB = None _LIB = None
def __new__(cls, name, bases, cls_dict): def __new__(cls, name, bases, cls_dict):
if name != 'watcher': if name != 'watcher' and not cls_dict.get('_watcher_skip_ffi'):
cls._fill_watcher(name, bases, cls_dict) cls._fill_watcher(name, bases, cls_dict)
return type.__new__(cls, name, bases, cls_dict) return type.__new__(cls, name, bases, cls_dict)
...@@ -410,9 +410,11 @@ class ChildMixin(object): ...@@ -410,9 +410,11 @@ class ChildMixin(object):
def rpid(self): def rpid(self):
return os.getpid() return os.getpid()
_rstatus = 0
@property @property
def rstatus(self): def rstatus(self):
return 0 return self._rstatus
class StatMixin(object): class StatMixin(object):
......
...@@ -5,7 +5,9 @@ libuv loop implementation ...@@ -5,7 +5,9 @@ libuv loop implementation
from __future__ import absolute_import, print_function from __future__ import absolute_import, print_function
import os import os
from collections import defaultdict
import signal import signal
import sys
from gevent._ffi.loop import AbstractLoop from gevent._ffi.loop import AbstractLoop
from gevent.libuv import _corecffi # pylint:disable=no-name-in-module,import-error from gevent.libuv import _corecffi # pylint:disable=no-name-in-module,import-error
...@@ -36,7 +38,7 @@ class loop(AbstractLoop): ...@@ -36,7 +38,7 @@ class loop(AbstractLoop):
def __init__(self, flags=None, default=None): def __init__(self, flags=None, default=None):
AbstractLoop.__init__(self, ffi, libuv, _watchers, flags, default) AbstractLoop.__init__(self, ffi, libuv, _watchers, flags, default)
self.__loop_pid = os.getpid() self.__loop_pid = os.getpid()
self._child_watchers = defaultdict(list)
def _init_loop(self, flags, default): def _init_loop(self, flags, default):
if default is None: if default is None:
...@@ -119,13 +121,13 @@ class loop(AbstractLoop): ...@@ -119,13 +121,13 @@ class loop(AbstractLoop):
# had already been closed # had already been closed
# (https://github.com/joyent/libuv/issues/1405) # (https://github.com/joyent/libuv/issues/1405)
raise NotImplementedError() #raise NotImplementedError()
pass
def run(self, nowait=False, once=False): def run(self, nowait=False, once=False):
# we can only respect one flag or the other. # we can only respect one flag or the other.
# nowait takes precedence because it can't block # nowait takes precedence because it can't block
print("RUNNING LOOP from", self.__loop_pid, "in", os.getpid())
mode = libuv.UV_RUN_DEFAULT mode = libuv.UV_RUN_DEFAULT
if once: if once:
mode = libuv.UV_RUN_ONCE mode = libuv.UV_RUN_ONCE
...@@ -167,4 +169,15 @@ class loop(AbstractLoop): ...@@ -167,4 +169,15 @@ class loop(AbstractLoop):
signal.SIGCHLD) signal.SIGCHLD)
def __sigchld_callback(self, _handler, _signum): def __sigchld_callback(self, _handler, _signum):
print("SIGCHILD") while True:
try:
pid, status, _usage = os.wait3(os.WNOHANG)
except OSError:
# Python 3 raises ChildProcessError
break
if pid == 0:
break
children_watchers = self._child_watchers.get(pid, []) + self._child_watchers.get(0, [])
for watcher in children_watchers:
watcher._set_status(status)
# pylint: disable=too-many-lines, protected-access, redefined-outer-name, not-callable # pylint: disable=too-many-lines, protected-access, redefined-outer-name, not-callable
from __future__ import absolute_import, print_function from __future__ import absolute_import, print_function
import os
import sys
import gevent.libuv._corecffi as _corecffi # pylint:disable=no-name-in-module,import-error import gevent.libuv._corecffi as _corecffi # pylint:disable=no-name-in-module,import-error
ffi = _corecffi.ffi # pylint:disable=no-member ffi = _corecffi.ffi # pylint:disable=no-member
...@@ -104,7 +107,8 @@ class fork(_base.ForkMixin): ...@@ -104,7 +107,8 @@ class fork(_base.ForkMixin):
def stop(self, *args): def stop(self, *args):
pass pass
class child(_base.ChildMixin): class child(_base.ChildMixin, watcher):
_watcher_skip_ffi = True
# We'll have to implement this one completely manually. # We'll have to implement this one completely manually.
# Our approach is to use a SIGCHLD handler and the original # Our approach is to use a SIGCHLD handler and the original
# os.waitpid call. # os.waitpid call.
...@@ -114,10 +118,43 @@ class child(_base.ChildMixin): ...@@ -114,10 +118,43 @@ class child(_base.ChildMixin):
# we're not adding any new SIGCHLD related issues not already # we're not adding any new SIGCHLD related issues not already
# present in libev. # present in libev.
_CALL_SUPER_INIT = False def __init__(self, *args, **kwargs):
super(child, self).__init__(*args, **kwargs)
self._async = self.loop.async()
def _watcher_create(self, _args):
return
@property
def _watcher_handle(self):
return None
def _watcher_ffi_init(self, args):
return
def start(self, cb, *args): def start(self, cb, *args):
pass self.loop._child_watchers[self._pid].append(self)
self.callback = cb
self.args = args
self._async.start(cb, *args)
#watcher.start(self, cb, *args)
@property
def active(self):
return self._async.active
def stop(self):
try:
self.loop._child_watchers[self._pid].remove(self)
except ValueError:
pass
self.callback = None
self.args = None
self._async.stop()
def _set_status(self, status):
self._rstatus = status
self._async.send()
class async(_base.AsyncMixin, watcher): class async(_base.AsyncMixin, watcher):
......
...@@ -341,6 +341,12 @@ if hasattr(os, 'fork'): ...@@ -341,6 +341,12 @@ if hasattr(os, 'fork'):
# we're not watching it and it may not even be our child, # we're not watching it and it may not even be our child,
# so we must go to the OS to be sure to get the right semantics (exception) # so we must go to the OS to be sure to get the right semantics (exception)
# XXX
# libuv has a race condition because the signal
# handler is a Python function, so the InterruptedError
# is raised before the signal handler runs and calls the
# child watcher
# we're not watching it
return _waitpid(pid, options) return _waitpid(pid, options)
def fork_and_watch(callback=None, loop=None, ref=False, fork=fork_gevent): def fork_and_watch(callback=None, loop=None, ref=False, fork=fork_gevent):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment