Commit 12dd4d9e authored by Jason Madden's avatar Jason Madden

Attempt and FAIL to implement fork/child on libuv.

Sadly, libuv simply cannot be used in a child process that has fork()'d
from a parent that also used libuv. The result is a pretty quick abort()
and crash.

It *may* be possible to implement `subprocess` on top of uv_spawn (I
haven't looked into that), but things like multiprocessing and
futures.ProcessPoolExecutor would be completely broken.

Comments in commit.
parent bb933fcc
......@@ -180,17 +180,21 @@ class AbstractLoop(object):
def __init__(self, ffi, lib, watchers, flags=None, default=None):
self._ffi = ffi
self._lib = lib
self._ptr = None
self._watchers = watchers
self._in_callback = False
self._callbacks = []
self._keepaliveset = set()
self._init_loop_and_aux_watchers(flags, default)
def _init_loop_and_aux_watchers(self, flags=None, default=None):
self._ptr = self._init_loop(flags, default)
# self._check is a watcher that runs in each iteration of the
# mainloop, just after the blocking call
self._check = ffi.new(self._CHECK_POINTER)
self._check_callback_ffi = _loop_callback(ffi,
self._check = self._ffi.new(self._CHECK_POINTER)
self._check_callback_ffi = _loop_callback(self._ffi,
self._CHECK_CALLBACK_SIG,
self._check_callback,
onerror=self._check_callback_handle_error)
......@@ -198,8 +202,8 @@ class AbstractLoop(object):
# self._prepare is a watcher that runs in each iteration of the mainloop,
# just before the blocking call
self._prepare = ffi.new(self._PREPARE_POINTER)
self._prepare_callback_ffi = _loop_callback(ffi,
self._prepare = self._ffi.new(self._PREPARE_POINTER)
self._prepare_callback_ffi = _loop_callback(self._ffi,
self._PREPARE_CALLBACK_SIG,
self._run_callbacks,
onerror=self._check_callback_handle_error)
......@@ -211,7 +215,7 @@ class AbstractLoop(object):
# as quickly as possible.
# TODO: There may be a more efficient way to do this using ev_timer_again;
# see the "ev_timer" section of the ev manpage (http://linux.die.net/man/3/ev)
self._timer0 = ffi.new(self._TIMER_POINTER)
self._timer0 = self._ffi.new(self._TIMER_POINTER)
self._init_callback_timer()
# TODO: We may be able to do something nicer and use the existing python_callback
......
......@@ -388,12 +388,16 @@ class AsyncMixin(object):
class ChildMixin(object):
# hack for libuv which doesn't extend watcher
_CALL_SUPER_INIT = True
def __init__(self, loop, pid, trace=0, ref=True):
if not loop.default:
raise TypeError('child watchers are only available on the default loop')
loop.install_sigchld()
self._pid = pid
super(ChildMixin, self).__init__(loop, ref=ref, args=(pid, trace))
if self._CALL_SUPER_INIT:
super(ChildMixin, self).__init__(loop, ref=ref, args=(pid, trace))
def _format(self):
return ' pid=%r rstatus=%r' % (self.pid, self.rstatus)
......
......@@ -47,6 +47,8 @@ enum uv_fs_event_flags {
UV_FS_EVENT_RECURSIVE = 4
};
const char* uv_strerror(int);
const char* uv_err_name(int);
// handle structs and types
struct uv_loop_s {
......@@ -121,7 +123,7 @@ typedef void (*uv_async_cb)(void* handle);
typedef void (*uv_prepare_cb)(void*handle);
// callbacks with distinct sigs
typedef void (*uv_walk_cb)(void *handle, void *arg);
typedef void (*uv_walk_cb)(uv_handle_t *handle, void *arg);
typedef void (*uv_poll_cb)(void *handle, int status, int events);
typedef void (*uv_signal_cb)(void *handle, int signum);
......
......@@ -4,6 +4,9 @@ libuv loop implementation
from __future__ import absolute_import, print_function
import os
import signal
from gevent._ffi.loop import AbstractLoop
from gevent.libuv import _corecffi # pylint:disable=no-name-in-module,import-error
from gevent._ffi.loop import assign_standard_callbacks
......@@ -32,6 +35,7 @@ class loop(AbstractLoop):
def __init__(self, flags=None, default=None):
AbstractLoop.__init__(self, ffi, libuv, _watchers, flags, default)
self.__loop_pid = os.getpid()
def _init_loop(self, flags, default):
......@@ -98,11 +102,30 @@ class loop(AbstractLoop):
# TODO: How to implement? We probably have to simply
# re-__init__ this whole class? Does it matter?
# OR maybe we need to uv_walk() and close all the handles?
# XXX: libuv <= 1.9 simply CANNOT handle a fork unless you immediately
# exec() in the child. There are multiple calls to abort() that
# will kill the child process:
# - The OS X poll implementation (kqueue) aborts on an error return
# value; since kqueue FDs can't be inherited, then the next call
# to kqueue in the child will fail and get aborted; fork() is likely
# to be called during the gevent loop, meaning we're deep inside the
# runloop already, so we can't even close the loop that we're in:
# it's too late, the next call to kqueue is already scheduled.
# - The threadpool, should it be in use, also aborts
# (https://github.com/joyent/libuv/pull/1136)
# - There global shared state that breaks signal handling
# and leads to an abort() in the child, EVEN IF the loop in the parent
# had already been closed
# (https://github.com/joyent/libuv/issues/1405)
raise NotImplementedError()
def run(self, nowait=False, once=False):
# we can only respect one flag or the other.
# nowait takes precedence because it can't block
print("RUNNING LOOP from", self.__loop_pid, "in", os.getpid())
mode = libuv.UV_RUN_DEFAULT
if once:
mode = libuv.UV_RUN_ONCE
......@@ -124,3 +147,24 @@ class loop(AbstractLoop):
def fileno(self):
if self._ptr:
return libuv.uv_backend_fd(self._ptr)
_sigchld_watcher = None
_sigchld_callback_ffi = None
def install_sigchld(self):
if not self.default:
return
if self._sigchld_watcher:
return
self._sigchld_watcher = ffi.new('uv_signal_t*')
libuv.uv_signal_init(self._ptr, self._sigchld_watcher)
self._sigchld_callback_ffi = ffi.callback('void(*)(void*, int)',
self.__sigchld_callback)
libuv.uv_signal_start(self._sigchld_watcher,
self._sigchld_callback_ffi,
signal.SIGCHLD)
def __sigchld_callback(self, _handler, _signum):
print("SIGCHILD")
......@@ -92,7 +92,7 @@ class io(_base.IoMixin, watcher):
def _watcher_ffi_start(self):
self._watcher_start(self._watcher, self._events, self._watcher_callback)
class fork(object):
class fork(_base.ForkMixin):
# We'll have to implement this one completely manually
def __init__(self, *args, **kwargs):
......@@ -104,6 +104,21 @@ class fork(object):
def stop(self, *args):
pass
class child(_base.ChildMixin):
# We'll have to implement this one completely manually.
# Our approach is to use a SIGCHLD handler and the original
# os.waitpid call.
# On Unix, libuv's uv_process_t and uv_spawn use SIGCHLD,
# just like libev does for its child watchers. So
# we're not adding any new SIGCHLD related issues not already
# present in libev.
_CALL_SUPER_INIT = False
def start(self, cb, *args):
pass
class async(_base.AsyncMixin, watcher):
def _watcher_ffi_init(self, args):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment