Commit 4b59a304 authored by Jason Madden's avatar Jason Madden

Apply the FORKCHECK flag to make libev notice forks when multiple threads are in use.

parent ef6c7df2
...@@ -49,6 +49,8 @@ Unreleased ...@@ -49,6 +49,8 @@ Unreleased
forked child process, such as with ``multiprocessing.Process``. forked child process, such as with ``multiprocessing.Process``.
Previously the child process would hang indefinitely. Reported in Previously the child process would hang indefinitely. Reported in
:issue:`230` by Lx Yu. :issue:`230` by Lx Yu.
- Fork watchers are more likely to (eventually) get called in a
multi-threaded program.
1.1a1 (Jun 29, 2015) 1.1a1 (Jun 29, 2015)
==================== ====================
......
...@@ -270,6 +270,7 @@ cdef public class loop [object PyGeventLoopObject, type PyGeventLoop_Type]: ...@@ -270,6 +270,7 @@ cdef public class loop [object PyGeventLoopObject, type PyGeventLoop_Type]:
c_flags = _flags_to_int(flags) c_flags = _flags_to_int(flags)
_check_flags(c_flags) _check_flags(c_flags)
c_flags |= libev.EVFLAG_NOENV c_flags |= libev.EVFLAG_NOENV
c_flags |= libev.EVFLAG_FORKCHECK
if default is None: if default is None:
default = True default = True
if _default_loop_destroyed: if _default_loop_destroyed:
......
...@@ -549,6 +549,7 @@ class loop(object): ...@@ -549,6 +549,7 @@ class loop(object):
c_flags = _flags_to_int(flags) c_flags = _flags_to_int(flags)
_check_flags(c_flags) _check_flags(c_flags)
c_flags |= libev.EVFLAG_NOENV c_flags |= libev.EVFLAG_NOENV
c_flags |= libev.EVFLAG_FORKCHECK
if default is None: if default is None:
default = True default = True
if _default_loop_destroyed: if _default_loop_destroyed:
......
...@@ -153,16 +153,23 @@ def reinit(): ...@@ -153,16 +153,23 @@ def reinit():
hub = _get_hub() hub = _get_hub()
if hub is not None: if hub is not None:
hub.loop.reinit() hub.loop.reinit()
# XXX: libev's fork watchers seem not to be firing for some reason # libev's fork watchers are slow to fire because the only fire
# in both the cython (core.ppyx) and CFFI (corecffi.py) implementations # at the beginning of a loop; due to our use of callbacks that
# (at least on OS X; confirm on other platforms) # run at the end of the loop, that may be too late. The
# This breaks the threadpool and anything that uses it, including # threadpool and resolvers depend on the fork handlers being
# resolver_thread in the forked process (if there was already one thread # run ( specifically, the threadpool will fail in the forked
# in the pool before fork, adding an additional task will hang forever post-fork) # child if there were any threads in it, which there will be
# The below is a kludge. The correct fix is to figure out why the fork watchers # if the resolver_thread was in use (the default) before the
# don't work. Fortunately, both of these methods are idempotent and can be called # fork.)
# multiple times following a fork if the suddenly started working, or were already #
# working on some platforms. # If the forked process wants to use the threadpool or
# resolver immediately, it would hang.
#
# The below is a workaround. Fortunately, both of these
# methods are idempotent and can be called multiple times
# following a fork if the suddenly started working, or were
# already working on some platforms. Other threadpools and fork handlers
# will be called at an arbitrary time later ('soon')
if hasattr(hub.threadpool, '_on_fork'): if hasattr(hub.threadpool, '_on_fork'):
hub.threadpool._on_fork() hub.threadpool._on_fork()
# resolver_ares also has a fork watcher that's not firing # resolver_ares also has a fork watcher that's not firing
......
from __future__ import print_function
import gevent.monkey; gevent.monkey.patch_all()
import gevent
import os
hub = gevent.get_hub()
pid = os.getpid()
newpid = None
def on_fork():
global newpid
newpid = os.getpid()
fork_watcher = hub.loop.fork(ref=False)
fork_watcher.start(on_fork)
# fork watchers weren't firing in multi-threading processes.
def run():
# libev only calls fork callbacks at the beginning of
# the loop; we use callbacks extensively so it takes *two*
# calls to sleep (with a timer) to actually get wrapped
# around to the beginning of the loop.
gevent.sleep(0.01)
gevent.sleep(0.01)
q.put(newpid)
import multiprocessing
# Use a thread to make us multi-threaded
hub.threadpool.apply(lambda: None)
q = multiprocessing.Queue()
p = multiprocessing.Process(target=run)
p.start()
p_val = q.get()
p.join()
assert p_val is not None
assert p_val != pid
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment